diff --git a/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala b/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala
index 1fe20305d..50b37a6e1 100644
--- a/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala
+++ b/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala
@@ -24,36 +24,7 @@ import scala.concurrent.duration._
* Use [[AdminClientSettings#apply]] for the default settings, and
* then apply any desired modifications on top of that instance.
*/
-sealed abstract class AdminClientSettings {
-
- /**
- * Properties which can be provided when creating a Java `KafkaAdminClient`
- * instance. Numerous functions in [[AdminClientSettings]] add properties
- * here if the settings are used by the Java `KafkaAdminClient`.
- */
- def properties: Map[String, String]
-
- /**
- * Returns a new [[AdminClientSettings]] instance with the specified
- * bootstrap servers. This is equivalent to setting the following
- * property using the [[withProperty]] function.
- *
- * {{{
- * AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG
- * }}}
- */
- def withBootstrapServers(bootstrapServers: String): AdminClientSettings
-
- /**
- * Returns a new [[AdminClientSettings]] instance with the specified
- * client id. This is equivalent to setting the following property
- * using the [[withProperty]] function.
- *
- * {{{
- * AdminClientConfig.CLIENT_ID_CONFIG
- * }}}
- */
- def withClientId(clientId: String): AdminClientSettings
+sealed abstract class AdminClientSettings extends KafkaClientSettings[AdminClientSettings] {
/**
* Returns a new [[AdminClientSettings]] instance with the specified
@@ -103,18 +74,6 @@ sealed abstract class AdminClientSettings {
*/
def withConnectionsMaxIdle(connectionsMaxIdle: FiniteDuration): AdminClientSettings
- /**
- * Returns a new [[AdminClientSettings]] instance with the specified
- * request timeout. This is equivalent to setting the following
- * property using the [[withProperty]] function, except you can
- * specify it with a `FiniteDuration` instead of a `String`.
- *
- * {{{
- * AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG
- * }}}
- */
- def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings
-
/**
* Returns a new [[AdminClientSettings]] instance with the specified
* max metadata age. This is equivalent to setting the following
@@ -138,44 +97,6 @@ sealed abstract class AdminClientSettings {
* }}}
*/
def withRetries(retries: Int): AdminClientSettings
-
- /**
- * Includes a property with the specified `key` and `value`.
- * The key should be one of the keys in `AdminClientConfig`,
- * and the value should be a valid choice for the key.
- */
- def withProperty(key: String, value: String): AdminClientSettings
-
- /**
- * Includes the specified keys and values as properties. The
- * keys should be part of the `AdminClientConfig` keys, and
- * the values should be valid choices for the keys.
- */
- def withProperties(properties: (String, String)*): AdminClientSettings
-
- /**
- * Includes the specified keys and values as properties. The
- * keys should be part of the `AdminClientConfig` keys, and
- * the values should be valid choices for the keys.
- */
- def withProperties(properties: Map[String, String]): AdminClientSettings
-
- /**
- * The time to wait for the Java `KafkaAdminClient` to shutdown.
- *
- * The default value is 20 seconds.
- */
- def closeTimeout: FiniteDuration
-
- /**
- * Creates a new [[AdminClientSettings]] with the specified [[closeTimeout]].
- */
- def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings
-
- /**
- * Includes the credentials properties from the provided [[KafkaCredentialStore]]
- */
- def withCredentials(credentialsStore: KafkaCredentialStore): AdminClientSettings
}
object AdminClientSettings {
@@ -184,12 +105,6 @@ object AdminClientSettings {
override val closeTimeout: FiniteDuration
) extends AdminClientSettings {
- override def withBootstrapServers(bootstrapServers: String): AdminClientSettings =
- withProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
-
- override def withClientId(clientId: String): AdminClientSettings =
- withProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId)
-
override def withReconnectBackoff(reconnectBackoff: FiniteDuration): AdminClientSettings =
withProperty(
AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG,
@@ -215,30 +130,18 @@ object AdminClientSettings {
connectionsMaxIdle.toMillis.toString
)
- override def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings =
- withProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toMillis.toString)
-
override def withMetadataMaxAge(metadataMaxAge: FiniteDuration): AdminClientSettings =
withProperty(AdminClientConfig.METADATA_MAX_AGE_CONFIG, metadataMaxAge.toMillis.toString)
override def withRetries(retries: Int): AdminClientSettings =
withProperty(AdminClientConfig.RETRIES_CONFIG, retries.toString)
- override def withProperty(key: String, value: String): AdminClientSettings =
- copy(properties = properties.updated(key, value))
-
- override def withProperties(properties: (String, String)*): AdminClientSettings =
- copy(properties = this.properties ++ properties.toMap)
-
override def withProperties(properties: Map[String, String]): AdminClientSettings =
copy(properties = this.properties ++ properties)
override def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings =
copy(closeTimeout = closeTimeout)
- override def withCredentials(credentialsStore: KafkaCredentialStore): AdminClientSettings =
- withProperties(credentialsStore.properties)
-
override def toString: String =
s"AdminClientSettings(closeTimeout = $closeTimeout)"
}
diff --git a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
index 6e1582f83..619ffdcbc 100644
--- a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
+++ b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
@@ -7,7 +7,6 @@
package fs2.kafka
import cats.{Applicative, Show}
-import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.requests.OffsetFetchResponse
@@ -35,7 +34,8 @@ import scala.concurrent.duration._
*
* Use `ConsumerSettings#apply` to create a new instance.
*/
-sealed abstract class ConsumerSettings[F[_], K, V] {
+sealed abstract class ConsumerSettings[F[_], K, V]
+ extends KafkaClientSettings[ConsumerSettings[F, K, V]] {
/**
* The `Deserializer` to use for deserializing record keys.
@@ -65,24 +65,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V]
- /**
- * Properties which can be provided when creating a Java `KafkaConsumer`
- * instance. Numerous functions in [[ConsumerSettings]] add properties
- * here if the settings are used by the Java `KafkaConsumer`.
- */
- def properties: Map[String, String]
-
- /**
- * Returns a new [[ConsumerSettings]] instance with the specified
- * bootstrap servers. This is equivalent to setting the following
- * property using the [[withProperty]] function.
- *
- * {{{
- * ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
- * }}}
- */
- def withBootstrapServers(bootstrapServers: String): ConsumerSettings[F, K, V]
-
/**
* Returns a new [[ConsumerSettings]] instance with the specified
* auto offset reset. This is equivalent to setting the following
@@ -95,17 +77,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withAutoOffsetReset(autoOffsetReset: AutoOffsetReset): ConsumerSettings[F, K, V]
- /**
- * Returns a new [[ConsumerSettings]] instance with the specified
- * client id. This is equivalent to setting the following property
- * using the [[withProperty]] function.
- *
- * {{{
- * ConsumerConfig.CLIENT_ID_CONFIG
- * }}}
- */
- def withClientId(clientId: String): ConsumerSettings[F, K, V]
-
/**
* Returns a new [[ConsumerSettings]] instance with the specified
* group id. This is equivalent to setting the following property
@@ -202,18 +173,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withAutoCommitInterval(autoCommitInterval: FiniteDuration): ConsumerSettings[F, K, V]
- /**
- * Returns a new [[ConsumerSettings]] instance with the specified
- * request timeout. This is equivalent to setting the following
- * property using the [[withProperty]] function, except you can
- * specify it with a `FiniteDuration` instead of a `String`.
- *
- * {{{
- * ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG
- * }}}
- */
- def withRequestTimeout(requestTimeout: FiniteDuration): ConsumerSettings[F, K, V]
-
/**
* Returns a new [[ConsumerSettings]] instance with the specified
* default api timeout. This is equivalent to setting the following
@@ -250,50 +209,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def withAllowAutoCreateTopics(allowAutoCreateTopics: Boolean): ConsumerSettings[F, K, V]
- /**
- * Returns a new [[ConsumerSettings]] instance with the specified
- * client rack. This is equivalent to setting the following
- * property using the [[withProperty]] function.
- *
- * {{{
- * ConsumerConfig.CLIENT_RACK_CONFIG
- * }}}
- */
- def withClientRack(clientRack: String): ConsumerSettings[F, K, V]
-
- /**
- * Includes a property with the specified `key` and `value`.
- * The key should be one of the keys in `ConsumerConfig`,
- * and the value should be a valid choice for the key.
- */
- def withProperty(key: String, value: String): ConsumerSettings[F, K, V]
-
- /**
- * Includes the specified keys and values as properties. The
- * keys should be part of the `ConsumerConfig` keys, and
- * the values should be valid choices for the keys.
- */
- def withProperties(properties: (String, String)*): ConsumerSettings[F, K, V]
-
- /**
- * Includes the specified keys and values as properties. The
- * keys should be part of the `ConsumerConfig` keys, and
- * the values should be valid choices for the keys.
- */
- def withProperties(properties: Map[String, String]): ConsumerSettings[F, K, V]
-
- /**
- * The time to wait for the Java `KafkaConsumer` to shutdown.
- *
- * The default value is 20 seconds.
- */
- def closeTimeout: FiniteDuration
-
- /**
- * Creates a new [[ConsumerSettings]] with the specified [[closeTimeout]].
- */
- def withCloseTimeout(closeTimeout: FiniteDuration): ConsumerSettings[F, K, V]
-
/**
* The time to wait for offset commits to complete. If an offset commit
* doesn't complete within this time, a [[CommitTimeoutException]] will
@@ -386,11 +301,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
* instead be set to `2` and not the specified value.
*/
def withMaxPrefetchBatches(maxPrefetchBatches: Int): ConsumerSettings[F, K, V]
-
- /**
- * Includes the credentials properties from the provided [[KafkaCredentialStore]]
- */
- def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V]
}
object ConsumerSettings {
@@ -410,9 +320,6 @@ object ConsumerSettings {
override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] =
copy(customBlockingContext = Some(ec))
- override def withBootstrapServers(bootstrapServers: String): ConsumerSettings[F, K, V] =
- withProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
-
override def withAutoOffsetReset(autoOffsetReset: AutoOffsetReset): ConsumerSettings[F, K, V] =
withProperty(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
@@ -423,9 +330,6 @@ object ConsumerSettings {
}
)
- override def withClientId(clientId: String): ConsumerSettings[F, K, V] =
- withProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
-
override def withGroupId(groupId: String): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
@@ -457,12 +361,6 @@ object ConsumerSettings {
autoCommitInterval.toMillis.toString
)
- override def withRequestTimeout(requestTimeout: FiniteDuration): ConsumerSettings[F, K, V] =
- withProperty(
- ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
- requestTimeout.toMillis.toString
- )
-
override def withDefaultApiTimeout(
defaultApiTimeout: FiniteDuration
): ConsumerSettings[F, K, V] =
@@ -485,15 +383,6 @@ object ConsumerSettings {
): ConsumerSettings[F, K, V] =
withProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, allowAutoCreateTopics.toString)
- override def withClientRack(clientRack: String): ConsumerSettings[F, K, V] =
- withProperty(ConsumerConfig.CLIENT_RACK_CONFIG, clientRack)
-
- override def withProperty(key: String, value: String): ConsumerSettings[F, K, V] =
- copy(properties = properties.updated(key, value))
-
- override def withProperties(properties: (String, String)*): ConsumerSettings[F, K, V] =
- copy(properties = this.properties ++ properties.toMap)
-
override def withProperties(properties: Map[String, String]): ConsumerSettings[F, K, V] =
copy(properties = this.properties ++ properties)
@@ -520,11 +409,6 @@ object ConsumerSettings {
override def withMaxPrefetchBatches(maxPrefetchBatches: Int): ConsumerSettings[F, K, V] =
copy(maxPrefetchBatches = Math.max(2, maxPrefetchBatches))
- override def withCredentials(
- credentialsStore: KafkaCredentialStore
- ): ConsumerSettings[F, K, V] =
- withProperties(credentialsStore.properties)
-
override def toString: String =
s"ConsumerSettings(closeTimeout = $closeTimeout, commitTimeout = $commitTimeout, pollInterval = $pollInterval, pollTimeout = $pollTimeout, commitRecovery = $commitRecovery)"
}
diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaClientSettings.scala b/modules/core/src/main/scala/fs2/kafka/KafkaClientSettings.scala
new file mode 100644
index 000000000..c11bd8faf
--- /dev/null
+++ b/modules/core/src/main/scala/fs2/kafka/KafkaClientSettings.scala
@@ -0,0 +1,111 @@
+package fs2.kafka
+
+import fs2.kafka.security.KafkaCredentialStore
+import org.apache.kafka.clients.CommonClientConfigs
+
+import scala.concurrent.duration._
+
+/**
+ * Settings common to producers, consumers, and admin clients. Implemented by
+ * [[ConsumerSettings]], [[ProducerSettings]], and [[AdminClientSettings]].
+ */
+private[kafka] trait KafkaClientSettings[Self <: KafkaClientSettings[Self]] { self =>
+
+ /**
+ * Properties which can be provided when creating a Java `KafkaProducer`
+ * instance. Numerous functions in `Settings` classes add properties
+ * here if the settings are used by the Java Kakfa client.
+ */
+ def properties: Map[String, String]
+
+ /**
+ * Includes a property with the specified `key` and `value`.
+ * The key should be one of the config keys for the underlying,
+ * Java client, and the value should be a valid choice for the key.
+ */
+ final def withProperty(key: String, value: String): Self =
+ withProperties(properties.updated(key, value))
+
+ /**
+ * Includes the specified keys and values as properties.
+ * The keys should be among the config keys for the underlying,
+ * Java client, and the value should be a valid choice for the key.
+ */
+ final def withProperties(properties: (String, String)*): Self =
+ withProperties(this.properties ++ properties.toMap)
+
+ /**
+ * Includes the specified keys and values as properties.
+ * The keys should be among the config keys for the underlying,
+ * Java client, and the value should be a valid choice for the key.
+ */
+ def withProperties(properties: Map[String, String]): Self
+
+ /**
+ * Returns a new `Settings` instance with the specified
+ * bootstrap servers. This is equivalent to setting the following
+ * property using the [[withProperty]] function.
+ *
+ * {{{
+ * CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
+ * }}}
+ */
+ final def withBootstrapServers(bootstrapServers: String): Self =
+ withProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+
+ /**
+ * Returns a new `Settings` instance with the specified
+ * client id. This is equivalent to setting the following property
+ * using the [[withProperty]] function.
+ *
+ * {{{
+ * ProducerConfig.CLIENT_ID_CONFIG
+ * }}}
+ */
+ final def withClientId(clientId: String): Self =
+ withProperty(CommonClientConfigs.CLIENT_ID_CONFIG, clientId)
+
+ /**
+ * Returns a new `Settings` instance with the specified
+ * client rack. This is equivalent to setting the following
+ * property using the [[withProperty]] function.
+ *
+ * {{{
+ * CommonClientConfigs.CLIENT_RACK_CONFIG
+ * }}}
+ */
+ final def withClientRack(clientRack: String): Self =
+ withProperty(CommonClientConfigs.CLIENT_RACK_CONFIG, clientRack)
+
+ /**
+ * Includes the credentials properties from the provided [[KafkaCredentialStore]]
+ */
+ final def withCredentials(credentialsStore: KafkaCredentialStore): Self =
+ withProperties(credentialsStore.properties)
+
+ /**
+ * Returns a new `Settings` instance with the specified
+ * request timeout. This is equivalent to setting the following
+ * property using the [[withProperty]] function, except you can
+ * specify it with a `FiniteDuration` instead of a `String`.
+ *
+ * {{{
+ * CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
+ * }}}
+ */
+ final def withRequestTimeout(requestTimeout: FiniteDuration): Self =
+ withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toMillis.toString)
+
+ /**
+ * The time to wait for the Java client to shutdown.
+ *
+ * The default value is 60 seconds for [[ConsumerSettings]], 20 seconds for [[ProducerSettings]],
+ * and 2 seconds for [[AdminClientSettings]].
+ */
+ def closeTimeout: FiniteDuration
+
+ /**
+ * Creates a new `Settings` instance with the specified [[closeTimeout]].
+ */
+ def withCloseTimeout(closeTimeout: FiniteDuration): Self
+}
diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala
index fd7e30b14..e2b2d7a7d 100644
--- a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala
+++ b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala
@@ -7,7 +7,6 @@
package fs2.kafka
import cats.{Applicative, Show}
-import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.producer.ProducerConfig
import scala.concurrent.ExecutionContext
@@ -26,7 +25,8 @@ import scala.concurrent.duration._
*
* Use `ProducerSettings#apply` to create a new instance.
*/
-sealed abstract class ProducerSettings[F[_], K, V] {
+sealed abstract class ProducerSettings[F[_], K, V]
+ extends KafkaClientSettings[ProducerSettings[F, K, V]] {
/**
* The `Serializer` to use for serializing record keys.
@@ -54,24 +54,6 @@ sealed abstract class ProducerSettings[F[_], K, V] {
*/
def withCustomBlockingContext(ec: ExecutionContext): ProducerSettings[F, K, V]
- /**
- * Properties which can be provided when creating a Java `KafkaProducer`
- * instance. Numerous functions in [[ProducerSettings]] add properties
- * here if the settings are used by the Java `KafkaProducer`.
- */
- def properties: Map[String, String]
-
- /**
- * Returns a new [[ProducerSettings]] instance with the specified
- * bootstrap servers. This is equivalent to setting the following
- * property using the [[withProperty]] function.
- *
- * {{{
- * ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
- * }}}
- */
- def withBootstrapServers(bootstrapServers: String): ProducerSettings[F, K, V]
-
/**
* Returns a new [[ProducerSettings]] instance with the specified
* acknowledgements. This is equivalent to setting the following
@@ -96,17 +78,6 @@ sealed abstract class ProducerSettings[F[_], K, V] {
*/
def withBatchSize(batchSize: Int): ProducerSettings[F, K, V]
- /**
- * Returns a new [[ProducerSettings]] instance with the specified
- * client id. This is equivalent to setting the following property
- * using the [[withProperty]] function.
- *
- * {{{
- * ProducerConfig.CLIENT_ID_CONFIG
- * }}}
- */
- def withClientId(clientId: String): ProducerSettings[F, K, V]
-
/**
* Returns a new [[ProducerSettings]] instance with the specified
* retries. This is equivalent to setting the following property
@@ -158,18 +129,6 @@ sealed abstract class ProducerSettings[F[_], K, V] {
*/
def withLinger(linger: FiniteDuration): ProducerSettings[F, K, V]
- /**
- * Returns a new [[ProducerSettings]] instance with the specified
- * request timeout. This is equivalent to setting the following
- * property using the [[withProperty]] function, except you can
- * specify it with a `FiniteDuration` instead of a `String`.
- *
- * {{{
- * ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
- * }}}
- */
- def withRequestTimeout(requestTimeout: FiniteDuration): ProducerSettings[F, K, V]
-
/**
* Returns a new [[ProducerSettings]] instance with the specified
* delivery timeout. This is equivalent to setting the following
@@ -182,39 +141,6 @@ sealed abstract class ProducerSettings[F[_], K, V] {
*/
def withDeliveryTimeout(deliveryTimeout: FiniteDuration): ProducerSettings[F, K, V]
- /**
- * Includes a property with the specified `key` and `value`.
- * The key should be one of the keys in `ProducerConfig`,
- * and the value should be a valid choice for the key.
- */
- def withProperty(key: String, value: String): ProducerSettings[F, K, V]
-
- /**
- * Includes the specified keys and values as properties. The
- * keys should be part of the `ProducerConfig` keys, and
- * the values should be valid choices for the keys.
- */
- def withProperties(properties: (String, String)*): ProducerSettings[F, K, V]
-
- /**
- * Includes the specified keys and values as properties. The
- * keys should be part of the `ProducerConfig` keys, and
- * the values should be valid choices for the keys.
- */
- def withProperties(properties: Map[String, String]): ProducerSettings[F, K, V]
-
- /**
- * The time to wait for the Java `KafkaProducer` to shutdown.
- *
- * The default value is 60 seconds.
- */
- def closeTimeout: FiniteDuration
-
- /**
- * Creates a new [[ProducerSettings]] with the specified [[closeTimeout]].
- */
- def withCloseTimeout(closeTimeout: FiniteDuration): ProducerSettings[F, K, V]
-
/**
* The maximum number of [[ProducerRecords]] to produce in the same batch.
*
@@ -226,11 +152,6 @@ sealed abstract class ProducerSettings[F[_], K, V] {
* Creates a new [[ProducerSettings]] with the specified [[parallelism]].
*/
def withParallelism(parallelism: Int): ProducerSettings[F, K, V]
-
- /**
- * Includes the credentials properties from the provided [[KafkaCredentialStore]]
- */
- def withCredentials(credentialsStore: KafkaCredentialStore): ProducerSettings[F, K, V]
}
object ProducerSettings {
@@ -245,9 +166,6 @@ object ProducerSettings {
override def withCustomBlockingContext(ec: ExecutionContext): ProducerSettings[F, K, V] =
copy(customBlockingContext = Some(ec))
- override def withBootstrapServers(bootstrapServers: String): ProducerSettings[F, K, V] =
- withProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
-
override def withAcks(acks: Acks): ProducerSettings[F, K, V] =
withProperty(ProducerConfig.ACKS_CONFIG, acks match {
case Acks.ZeroAcks => "0"
@@ -258,9 +176,6 @@ object ProducerSettings {
override def withBatchSize(batchSize: Int): ProducerSettings[F, K, V] =
withProperty(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
- override def withClientId(clientId: String): ProducerSettings[F, K, V] =
- withProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId)
-
override def withRetries(retries: Int): ProducerSettings[F, K, V] =
withProperty(ProducerConfig.RETRIES_CONFIG, retries.toString)
@@ -278,18 +193,9 @@ object ProducerSettings {
override def withLinger(linger: FiniteDuration): ProducerSettings[F, K, V] =
withProperty(ProducerConfig.LINGER_MS_CONFIG, linger.toMillis.toString)
- override def withRequestTimeout(requestTimeout: FiniteDuration): ProducerSettings[F, K, V] =
- withProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toMillis.toString)
-
override def withDeliveryTimeout(deliveryTimeout: FiniteDuration): ProducerSettings[F, K, V] =
withProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeout.toMillis.toString)
- override def withProperty(key: String, value: String): ProducerSettings[F, K, V] =
- copy(properties = properties.updated(key, value))
-
- override def withProperties(properties: (String, String)*): ProducerSettings[F, K, V] =
- copy(properties = this.properties ++ properties.toMap)
-
override def withProperties(properties: Map[String, String]): ProducerSettings[F, K, V] =
copy(properties = this.properties ++ properties)
@@ -299,14 +205,6 @@ object ProducerSettings {
override def withParallelism(parallelism: Int): ProducerSettings[F, K, V] =
copy(parallelism = parallelism)
- /**
- * Includes the credentials properties from the provided [[KafkaCredentialStore]]
- */
- override def withCredentials(
- credentialsStore: KafkaCredentialStore
- ): ProducerSettings[F, K, V] =
- withProperties(credentialsStore.properties)
-
override def toString: String =
s"ProducerSettings(closeTimeout = $closeTimeout)"
}