From 1259f557149651b499134311c3079cb54b3492c5 Mon Sep 17 00:00:00 2001 From: zizhao Date: Wed, 1 Nov 2023 04:43:52 +0200 Subject: [PATCH] add conf for server --- .../shuffle/ucx/ExternalUcxServerConf.scala | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/spark/shuffle/ucx/ExternalUcxServerConf.scala b/src/main/scala/org/apache/spark/shuffle/ucx/ExternalUcxServerConf.scala index 69274bf9..6b9be6e2 100644 --- a/src/main/scala/org/apache/spark/shuffle/ucx/ExternalUcxServerConf.scala +++ b/src/main/scala/org/apache/spark/shuffle/ucx/ExternalUcxServerConf.scala @@ -13,6 +13,32 @@ import org.apache.spark.SparkConf * Plugin configuration properties. */ class ExternalUcxServerConf(val yarnConf: Configuration) extends ExternalUcxConf { + override lazy val preallocateBuffersMap: Map[Long, Int] = yarnConf.get( + ExternalUcxConf.PREALLOCATE_BUFFERS_KEY).split(",").withFilter(s => s.nonEmpty) + .map(entry => entry.split(":") match { + case Array(bufferSize, bufferCount) => (bufferSize.toLong, bufferCount.toInt) + }).toMap + + override lazy val minBufferSize: Long = yarnConf.getLong( + ExternalUcxConf.MIN_BUFFER_SIZE_KEY, + ExternalUcxConf.MIN_BUFFER_SIZE_DEFAULT) + + override lazy val minRegistrationSize: Int = yarnConf.getInt( + ExternalUcxConf.MIN_REGISTRATION_SIZE_KEY, + ExternalUcxConf.MIN_REGISTRATION_SIZE_DEFAULT) + + override lazy val listenerAddress: String = yarnConf.get( + ExternalUcxConf.SOCKADDR_KEY, + ExternalUcxConf.SOCKADDR_DEFAULT) + + override lazy val useWakeup: Boolean = yarnConf.getBoolean( + ExternalUcxConf.WAKEUP_FEATURE_KEY, + ExternalUcxConf.WAKEUP_FEATURE_DEFAULT) + + override lazy val numListenerThreads: Int = yarnConf.getInt( + ExternalUcxConf.NUM_LISTNER_THREADS_KEY, + ExternalUcxConf.NUM_LISTNER_THREADS_DEFAULT) + override lazy val ucxServerPort: Int = yarnConf.getInt( ExternalUcxConf.SPARK_UCX_SHUFFLE_SERVICE_PORT_KEY, ExternalUcxConf.SPARK_UCX_SHUFFLE_SERVICE_PORT_DEFAULT) @@ -22,7 +48,7 @@ class ExternalUcxServerConf(val yarnConf: Configuration) extends ExternalUcxConf ExternalUcxServerConf.SPARK_UCX_SHUFFLE_EPS_NUM_DEFAULT) } -object ExternalUcxServerConf { +object ExternalUcxServerConf extends ExternalUcxConf { lazy val SPARK_UCX_SHUFFLE_EPS_NUM_KEY = "spark.shuffle.ucx.eps.num" lazy val SPARK_UCX_SHUFFLE_EPS_NUM_DEFAULT = 1024