Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update scalafmt-core to 3.8.6 #176

Merged
merged 3 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Scala Steward: Reformat with scalafmt 3.8.6
c5e0595371a88f64dd1f32c9f226cea46132ccc7
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
preset = defaultWithAlign
//@formatter on
version = 3.7.17
version = 3.8.6
runner.dialect = scala3
align.preset = more
danglingParentheses.preset = true
Expand Down
26 changes: 14 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ sonatypeProfileName := "com.crobox"

lazy val root = (project in file("."))
.settings(
publish := {},
publish := {},
publishArtifact := false,
inThisBuild(
List(
organization := "com.crobox.clickhouse",
homepage := Some(url("https://github.com/crobox/clickhouse-scala-client")),
licenses := List("The GNU Lesser General Public License, Version 3.0" -> url("http://www.gnu.org/licenses/lgpl-3.0.txt")),
homepage := Some(url("https://github.com/crobox/clickhouse-scala-client")),
licenses := List(
"The GNU Lesser General Public License, Version 3.0" -> url("http://www.gnu.org/licenses/lgpl-3.0.txt")
),
developers := List(
Developer(
"crobox",
Expand All @@ -21,10 +23,10 @@ lazy val root = (project in file("."))
url("https://crobox.com")
)
),
scalaVersion := "2.13.16",
scalaVersion := "2.13.16",
crossScalaVersions := List("2.13.16", "3.3.1"),
javacOptions ++= Seq("-g", "-Xlint:unchecked", "-Xlint:deprecation", "-source", "11", "-target", "11"),
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature", "-language:_", "-encoding", "UTF-8"),
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature", "-language:_", "-encoding", "UTF-8")
)
),
name := "clickhouse"
Expand All @@ -35,36 +37,36 @@ lazy val client: Project = (project in file("client"))
.configs(Config.CustomIntegrationTest)
.settings(Config.testSettings: _*)
.settings(
name := "client",
name := "client",
sbtrelease.ReleasePlugin.autoImport.releasePublishArtifactsAction := PgpKeys.publishSigned.value,
libraryDependencies ++= Seq(
"io.spray" %% "spray-json" % "1.3.6",
"org.apache.pekko" %% "pekko-actor" % PekkoVersion,
"org.apache.pekko" %% "pekko-stream" % PekkoVersion,
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"joda-time" % "joda-time" % "2.13.0"
) ++ Seq("org.apache.pekko" %% "pekko-testkit" % PekkoVersion % Test) ++ Build.testDependencies.map(_ % Test)
"joda-time" % "joda-time" % "2.13.0"
) ++ Seq("org.apache.pekko" %% "pekko-testkit" % PekkoVersion % Test) ++ Build.testDependencies.map(_ % Test)
)

lazy val dsl = (project in file("dsl"))
.dependsOn(client, client % "test->test", testkit % Test)
.configs(Config.CustomIntegrationTest)
.settings(Config.testSettings: _*)
.settings(
name := "dsl",
name := "dsl",
sbtrelease.ReleasePlugin.autoImport.releasePublishArtifactsAction := PgpKeys.publishSigned.value,
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "33.4.0-jre",
"com.typesafe" % "config" % "1.4.3"
"com.google.guava" % "guava" % "33.4.0-jre",
"com.typesafe" % "config" % "1.4.3"
)
)
// .settings(excludeDependencies ++= Seq(ExclusionRule("org.apache.pekko")))

lazy val testkit = (project in file("testkit"))
.dependsOn(client)
.settings(
name := "testkit",
name := "testkit",
sbtrelease.ReleasePlugin.autoImport.releasePublishArtifactsAction := PgpKeys.publishSigned.value,
libraryDependencies ++= Build.testDependencies
)
80 changes: 44 additions & 36 deletions client/src/main/scala/com/crobox/clickhouse/ClickhouseClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import scala.concurrent.{Await, ExecutionContext, Future}
/**
* Async clickhouse client using Pekko Http and Streams
*
* @author Sjoerd Mulder
* @author
* Sjoerd Mulder
* @since 31-03-17
*/
class ClickhouseClient(configuration: Option[Config] = None,
override val customConnectionContext: Option[HttpsConnectionContext] = None)
extends ClickHouseExecutor
class ClickhouseClient(
configuration: Option[Config] = None,
override val customConnectionContext: Option[HttpsConnectionContext] = None
) extends ClickHouseExecutor
with ClickhouseResponseParser
with ClickhouseQueryBuilder {

Expand All @@ -42,33 +44,38 @@ class ClickhouseClient(configuration: Option[Config] = None,
/**
* Execute a read-only query on Clickhouse
*
* @param sql a valid Clickhouse SQL string
* @return Future with the result that clickhouse returns
* @param sql
* a valid Clickhouse SQL string
* @return
* Future with the result that clickhouse returns
*/
def query(sql: String)(implicit settings: QuerySettings = QuerySettings(ReadQueries)): Future[String] =
executeRequest(sql, settings.copy(readOnly = ReadQueries, idempotent = settings.idempotent.orElse(Some(true))))

/**
* Execute a read-only query on Clickhouse
* Experimental api, may change in the future.
* Execute a read-only query on Clickhouse Experimental api, may change in the future.
*
* @param sql a valid Clickhouse SQL string
* @return stream with the query progress (started/rejected/finished/failed) which materializes with the query result
* @param sql
* a valid Clickhouse SQL string
* @return
* stream with the query progress (started/rejected/finished/failed) which materializes with the query result
*/
def queryWithProgress(sql: String)(
implicit settings: QuerySettings = QuerySettings(ReadQueries)
def queryWithProgress(sql: String)(implicit
settings: QuerySettings = QuerySettings(ReadQueries)
): Source[QueryProgress, Future[String]] =
executeRequestWithProgress(
sql,
settings.copy(readOnly = ReadQueries, idempotent = settings.idempotent.orElse(Some(true)))
)

/**
* Execute a query that is modifying the state of the database. e.g. INSERT, SET, CREATE TABLE.
* For security purposes SELECT and SHOW queries are not allowed, use the .query() method for those.
* Execute a query that is modifying the state of the database. e.g. INSERT, SET, CREATE TABLE. For security purposes
* SELECT and SHOW queries are not allowed, use the .query() method for those.
*
* @param sql a valid Clickhouse SQL string
* @return Future with the result that clickhouse returns
* @param sql
* a valid Clickhouse SQL string
* @return
* Future with the result that clickhouse returns
*/
def execute(sql: String)(implicit settings: QuerySettings = QuerySettings(AllQueries)): Future[String] =
Future {
Expand All @@ -87,16 +94,17 @@ class ClickhouseClient(configuration: Option[Config] = None,
/**
* Creates a stream of the SQL query that will delimit the result from Clickhouse on new-line
*
* @param sql a valid Clickhouse SQL string
* @param sql
* a valid Clickhouse SQL string
*/
def source(sql: String)(implicit settings: QuerySettings = QuerySettings(ReadQueries)): Source[String, NotUsed] =
sourceByteString(sql).via(Framing.delimiter(ByteString("\n"), MaximumFrameLength)).map(_.utf8String)

/**
* Creates a stream of the SQL query that will emit every result as a ByteString
* It will not retry the queries.
* Creates a stream of the SQL query that will emit every result as a ByteString It will not retry the queries.
*
* @param sql a valid Clickhouse SQL string
* @param sql
* a valid Clickhouse SQL string
*/
def sourceByteString(
sql: String
Expand All @@ -108,22 +116,24 @@ class ClickhouseClient(configuration: Option[Config] = None,
.flatMapConcat(response => response.entity.withoutSizeLimit().dataBytes)

/**
* Accepts a source of Strings that it will stream to Clickhouse
* It will not retry the query as this will run the source once for every retry and might have
* unexpected consequences.
* Accepts a source of Strings that it will stream to Clickhouse It will not retry the query as this will run the
* source once for every retry and might have unexpected consequences.
*
* @param sql a valid Clickhouse SQL INSERT statement
* @param source the Source with strings
* @return Future with the result that clickhouse returns
* @param sql
* a valid Clickhouse SQL INSERT statement
* @param source
* the Source with strings
* @return
* Future with the result that clickhouse returns
*/
def sink(sql: String, source: Source[ByteString, Any])(
implicit settings: QuerySettings = QuerySettings(AllQueries)
def sink(sql: String, source: Source[ByteString, Any])(implicit
settings: QuerySettings = QuerySettings(AllQueries)
): Future[String] = {
val entity = HttpEntity.apply(ContentTypes.`text/plain(UTF-8)`, source)
executeRequestInternal(hostBalancer.nextHost, sql, queryIdentifier, settings, Option(entity), None)
}

val serverVersion: ClickhouseServerVersion = {
val serverVersion: ClickhouseServerVersion =
try {
val path = "crobox.clickhouse.server.version"
val cfg = configuration.getOrElse(ConfigFactory.load())
Expand All @@ -132,12 +142,11 @@ class ClickhouseClient(configuration: Option[Config] = None,
} else {
Await.result(
query("select version()")(QuerySettings(ReadQueries).copy(retries = Option(0)))
.recover {
case x: ClickhouseException =>
val key = "(version "
val idx = x.getMessage.indexOf(key)
if (idx > 0) x.getMessage.substring(idx + key.length, x.getMessage.indexOf(")", idx + key.length))
else "Unknown"
.recover { case x: ClickhouseException =>
val key = "(version "
val idx = x.getMessage.indexOf(key)
if (idx > 0) x.getMessage.substring(idx + key.length, x.getMessage.indexOf(")", idx + key.length))
else "Unknown"
}
.map(ClickhouseServerVersion(_)),
5.seconds
Expand All @@ -151,5 +160,4 @@ class ClickhouseClient(configuration: Option[Config] = None,
logger.error(s"Can't determine Clickhouse Server Version. Falling back to: $latest. Error: ${x.getMessage}", x)
latest
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,31 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

/**
* Host balancer that does a round robin on all the entries found in the `system.clusters` table.
* It assumes that the service itself can access directly the clickhouse nodes and that the default port `8123` is used
* for every node.
**/
case class ClusterAwareHostBalancer(host: Uri,
cluster: String = "cluster",
manager: ActorRef,
scanningInterval: FiniteDuration)(
implicit system: ActorSystem,
* Host balancer that does a round robin on all the entries found in the `system.clusters` table. It assumes that the
* service itself can access directly the clickhouse nodes and that the default port `8123` is used for every node.
*/
case class ClusterAwareHostBalancer(
host: Uri,
cluster: String = "cluster",
manager: ActorRef,
scanningInterval: FiniteDuration
)(implicit
system: ActorSystem,
connectionRetrievalTimeout: Timeout,
ec: ExecutionContext
) extends HostBalancer {

ClusterConnectionFlow
.clusterConnectionsFlow(Future.successful(host), scanningInterval, cluster)
.withAttributes(
ActorAttributes.supervisionStrategy({
ActorAttributes.supervisionStrategy {
case ex: IllegalArgumentException =>
logger.error("Failed resolving hosts for cluster, stopping the flow.", ex)
Supervision.stop
case ex =>
logger.error("Failed resolving hosts for cluster, resuming.", ex)
Supervision.Resume
})
}
)
.runWith(Sink.actorRef(manager, LogDeadConnections, throwable => logger.error(throwable.getMessage, throwable)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ object HostBalancer extends ClickhouseHostBuilder {
case SingleHost => SingleHostBalancer(connectionHostFromConfig)
case BalancingHosts =>
val manager = system.actorOf(ConnectionManagerActor.props(ClickhouseHostHealth.healthFlow(_)))
MultiHostBalancer(connectionConfig
.getConfigList("hosts")
.asScala
.toSet
.map((config: Config) => extractHost(config)),
manager)
MultiHostBalancer(
connectionConfig
.getConfigList("hosts")
.asScala
.toSet
.map((config: Config) => extractHost(config)),
manager
)
case ClusterAware =>
val manager = system.actorOf(ConnectionManagerActor.props(ClickhouseHostHealth.healthFlow(_)))
ClusterAwareHostBalancer(
Expand All @@ -48,6 +50,8 @@ object HostBalancer extends ClickhouseHostBuilder {
}

def extractHost(connectionConfig: Config): Uri =
toHost(connectionConfig.getString("host"),
if (connectionConfig.hasPath("port")) Option(connectionConfig.getInt("port")) else None)
toHost(
connectionConfig.getString("host"),
if (connectionConfig.hasPath("port")) Option(connectionConfig.getInt("port")) else None
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import scala.concurrent.Future

/**
* The default host balancer which always provides the same host.
**/
*/
case class SingleHostBalancer(host: Uri) extends HostBalancer {

override def nextHost: Future[Uri] = Future.successful(host)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ import com.typesafe.config.Config
import scala.collection.mutable
import scala.concurrent.duration._

class ConnectionManagerActor(healthSource: Uri => Source[ClickhouseHostStatus, Cancellable],
optionalConfig: Option[Config])(
implicit materializer: Materializer
class ConnectionManagerActor(
healthSource: Uri => Source[ClickhouseHostStatus, Cancellable],
optionalConfig: Option[Config]
)(implicit
materializer: Materializer
) extends Actor
with ActorLogging
with Stash {

import ConnectionManagerActor._

private val config = optionalConfig.getOrElse(context.system.settings.config).getConfig("connection")
private val config = optionalConfig.getOrElse(context.system.settings.config).getConfig("connection")
private val fallbackToConfigurationHost = config.getBoolean("fallback-to-config-host-during-initialization")

// state
Expand All @@ -38,7 +40,7 @@ class ConnectionManagerActor(healthSource: Uri => Source[ClickhouseHostStatus, C
override def receive: Receive = {
case Connections(hosts) =>
hosts
.foreach(host => {
.foreach(host =>
if (!currentConfiguredHosts.contains(host)) {
log.info(s"Setting up host health checks for host $host")
hostHealthScheduler.put(
Expand All @@ -50,7 +52,7 @@ class ConnectionManagerActor(healthSource: Uri => Source[ClickhouseHostStatus, C
.run()
)
}
})
)
currentConfiguredHosts = hosts

case GetConnection() =>
Expand Down Expand Up @@ -99,8 +101,8 @@ class ConnectionManagerActor(healthSource: Uri => Source[ClickhouseHostStatus, C
}

case LogDeadConnections =>
val deadHosts = hostsStatus.values.collect {
case Dead(host, _) => host
val deadHosts = hostsStatus.values.collect { case Dead(host, _) =>
host
}
if (deadHosts.nonEmpty)
log.error(s"Hosts ${deadHosts.mkString(" - ")} are still unreachable")
Expand Down
Loading
Loading