diff --git a/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala b/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala index 1fe20305d..50b37a6e1 100644 --- a/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/AdminClientSettings.scala @@ -24,36 +24,7 @@ import scala.concurrent.duration._ * Use [[AdminClientSettings#apply]] for the default settings, and * then apply any desired modifications on top of that instance. */ -sealed abstract class AdminClientSettings { - - /** - * Properties which can be provided when creating a Java `KafkaAdminClient` - * instance. Numerous functions in [[AdminClientSettings]] add properties - * here if the settings are used by the Java `KafkaAdminClient`. - */ - def properties: Map[String, String] - - /** - * Returns a new [[AdminClientSettings]] instance with the specified - * bootstrap servers. This is equivalent to setting the following - * property using the [[withProperty]] function. - * - * {{{ - * AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG - * }}} - */ - def withBootstrapServers(bootstrapServers: String): AdminClientSettings - - /** - * Returns a new [[AdminClientSettings]] instance with the specified - * client id. This is equivalent to setting the following property - * using the [[withProperty]] function. - * - * {{{ - * AdminClientConfig.CLIENT_ID_CONFIG - * }}} - */ - def withClientId(clientId: String): AdminClientSettings +sealed abstract class AdminClientSettings extends KafkaClientSettings[AdminClientSettings] { /** * Returns a new [[AdminClientSettings]] instance with the specified @@ -103,18 +74,6 @@ sealed abstract class AdminClientSettings { */ def withConnectionsMaxIdle(connectionsMaxIdle: FiniteDuration): AdminClientSettings - /** - * Returns a new [[AdminClientSettings]] instance with the specified - * request timeout. This is equivalent to setting the following - * property using the [[withProperty]] function, except you can - * specify it with a `FiniteDuration` instead of a `String`. - * - * {{{ - * AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG - * }}} - */ - def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings - /** * Returns a new [[AdminClientSettings]] instance with the specified * max metadata age. This is equivalent to setting the following @@ -138,44 +97,6 @@ sealed abstract class AdminClientSettings { * }}} */ def withRetries(retries: Int): AdminClientSettings - - /** - * Includes a property with the specified `key` and `value`. - * The key should be one of the keys in `AdminClientConfig`, - * and the value should be a valid choice for the key. - */ - def withProperty(key: String, value: String): AdminClientSettings - - /** - * Includes the specified keys and values as properties. The - * keys should be part of the `AdminClientConfig` keys, and - * the values should be valid choices for the keys. - */ - def withProperties(properties: (String, String)*): AdminClientSettings - - /** - * Includes the specified keys and values as properties. The - * keys should be part of the `AdminClientConfig` keys, and - * the values should be valid choices for the keys. - */ - def withProperties(properties: Map[String, String]): AdminClientSettings - - /** - * The time to wait for the Java `KafkaAdminClient` to shutdown.
- *
- * The default value is 20 seconds. - */ - def closeTimeout: FiniteDuration - - /** - * Creates a new [[AdminClientSettings]] with the specified [[closeTimeout]]. - */ - def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings - - /** - * Includes the credentials properties from the provided [[KafkaCredentialStore]] - */ - def withCredentials(credentialsStore: KafkaCredentialStore): AdminClientSettings } object AdminClientSettings { @@ -184,12 +105,6 @@ object AdminClientSettings { override val closeTimeout: FiniteDuration ) extends AdminClientSettings { - override def withBootstrapServers(bootstrapServers: String): AdminClientSettings = - withProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) - - override def withClientId(clientId: String): AdminClientSettings = - withProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId) - override def withReconnectBackoff(reconnectBackoff: FiniteDuration): AdminClientSettings = withProperty( AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, @@ -215,30 +130,18 @@ object AdminClientSettings { connectionsMaxIdle.toMillis.toString ) - override def withRequestTimeout(requestTimeout: FiniteDuration): AdminClientSettings = - withProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toMillis.toString) - override def withMetadataMaxAge(metadataMaxAge: FiniteDuration): AdminClientSettings = withProperty(AdminClientConfig.METADATA_MAX_AGE_CONFIG, metadataMaxAge.toMillis.toString) override def withRetries(retries: Int): AdminClientSettings = withProperty(AdminClientConfig.RETRIES_CONFIG, retries.toString) - override def withProperty(key: String, value: String): AdminClientSettings = - copy(properties = properties.updated(key, value)) - - override def withProperties(properties: (String, String)*): AdminClientSettings = - copy(properties = this.properties ++ properties.toMap) - override def withProperties(properties: Map[String, String]): AdminClientSettings = copy(properties = this.properties ++ properties) override def withCloseTimeout(closeTimeout: FiniteDuration): AdminClientSettings = copy(closeTimeout = closeTimeout) - override def withCredentials(credentialsStore: KafkaCredentialStore): AdminClientSettings = - withProperties(credentialsStore.properties) - override def toString: String = s"AdminClientSettings(closeTimeout = $closeTimeout)" } diff --git a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala index 6e1582f83..619ffdcbc 100644 --- a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala @@ -7,7 +7,6 @@ package fs2.kafka import cats.{Applicative, Show} -import fs2.kafka.security.KafkaCredentialStore import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.requests.OffsetFetchResponse @@ -35,7 +34,8 @@ import scala.concurrent.duration._ *
* Use `ConsumerSettings#apply` to create a new instance. */ -sealed abstract class ConsumerSettings[F[_], K, V] { +sealed abstract class ConsumerSettings[F[_], K, V] + extends KafkaClientSettings[ConsumerSettings[F, K, V]] { /** * The `Deserializer` to use for deserializing record keys. @@ -65,24 +65,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] { */ def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] - /** - * Properties which can be provided when creating a Java `KafkaConsumer` - * instance. Numerous functions in [[ConsumerSettings]] add properties - * here if the settings are used by the Java `KafkaConsumer`. - */ - def properties: Map[String, String] - - /** - * Returns a new [[ConsumerSettings]] instance with the specified - * bootstrap servers. This is equivalent to setting the following - * property using the [[withProperty]] function. - * - * {{{ - * ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - * }}} - */ - def withBootstrapServers(bootstrapServers: String): ConsumerSettings[F, K, V] - /** * Returns a new [[ConsumerSettings]] instance with the specified * auto offset reset. This is equivalent to setting the following @@ -95,17 +77,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] { */ def withAutoOffsetReset(autoOffsetReset: AutoOffsetReset): ConsumerSettings[F, K, V] - /** - * Returns a new [[ConsumerSettings]] instance with the specified - * client id. This is equivalent to setting the following property - * using the [[withProperty]] function. - * - * {{{ - * ConsumerConfig.CLIENT_ID_CONFIG - * }}} - */ - def withClientId(clientId: String): ConsumerSettings[F, K, V] - /** * Returns a new [[ConsumerSettings]] instance with the specified * group id. This is equivalent to setting the following property @@ -202,18 +173,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] { */ def withAutoCommitInterval(autoCommitInterval: FiniteDuration): ConsumerSettings[F, K, V] - /** - * Returns a new [[ConsumerSettings]] instance with the specified - * request timeout. This is equivalent to setting the following - * property using the [[withProperty]] function, except you can - * specify it with a `FiniteDuration` instead of a `String`. - * - * {{{ - * ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG - * }}} - */ - def withRequestTimeout(requestTimeout: FiniteDuration): ConsumerSettings[F, K, V] - /** * Returns a new [[ConsumerSettings]] instance with the specified * default api timeout. This is equivalent to setting the following @@ -250,50 +209,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] { */ def withAllowAutoCreateTopics(allowAutoCreateTopics: Boolean): ConsumerSettings[F, K, V] - /** - * Returns a new [[ConsumerSettings]] instance with the specified - * client rack. This is equivalent to setting the following - * property using the [[withProperty]] function. - * - * {{{ - * ConsumerConfig.CLIENT_RACK_CONFIG - * }}} - */ - def withClientRack(clientRack: String): ConsumerSettings[F, K, V] - - /** - * Includes a property with the specified `key` and `value`. - * The key should be one of the keys in `ConsumerConfig`, - * and the value should be a valid choice for the key. - */ - def withProperty(key: String, value: String): ConsumerSettings[F, K, V] - - /** - * Includes the specified keys and values as properties. The - * keys should be part of the `ConsumerConfig` keys, and - * the values should be valid choices for the keys. - */ - def withProperties(properties: (String, String)*): ConsumerSettings[F, K, V] - - /** - * Includes the specified keys and values as properties. The - * keys should be part of the `ConsumerConfig` keys, and - * the values should be valid choices for the keys. - */ - def withProperties(properties: Map[String, String]): ConsumerSettings[F, K, V] - - /** - * The time to wait for the Java `KafkaConsumer` to shutdown.
- *
- * The default value is 20 seconds. - */ - def closeTimeout: FiniteDuration - - /** - * Creates a new [[ConsumerSettings]] with the specified [[closeTimeout]]. - */ - def withCloseTimeout(closeTimeout: FiniteDuration): ConsumerSettings[F, K, V] - /** * The time to wait for offset commits to complete. If an offset commit * doesn't complete within this time, a [[CommitTimeoutException]] will @@ -386,11 +301,6 @@ sealed abstract class ConsumerSettings[F[_], K, V] { * instead be set to `2` and not the specified value. */ def withMaxPrefetchBatches(maxPrefetchBatches: Int): ConsumerSettings[F, K, V] - - /** - * Includes the credentials properties from the provided [[KafkaCredentialStore]] - */ - def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings[F, K, V] } object ConsumerSettings { @@ -410,9 +320,6 @@ object ConsumerSettings { override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] = copy(customBlockingContext = Some(ec)) - override def withBootstrapServers(bootstrapServers: String): ConsumerSettings[F, K, V] = - withProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) - override def withAutoOffsetReset(autoOffsetReset: AutoOffsetReset): ConsumerSettings[F, K, V] = withProperty( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, @@ -423,9 +330,6 @@ object ConsumerSettings { } ) - override def withClientId(clientId: String): ConsumerSettings[F, K, V] = - withProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) - override def withGroupId(groupId: String): ConsumerSettings[F, K, V] = withProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) @@ -457,12 +361,6 @@ object ConsumerSettings { autoCommitInterval.toMillis.toString ) - override def withRequestTimeout(requestTimeout: FiniteDuration): ConsumerSettings[F, K, V] = - withProperty( - ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, - requestTimeout.toMillis.toString - ) - override def withDefaultApiTimeout( defaultApiTimeout: FiniteDuration ): ConsumerSettings[F, K, V] = @@ -485,15 +383,6 @@ object ConsumerSettings { ): ConsumerSettings[F, K, V] = withProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, allowAutoCreateTopics.toString) - override def withClientRack(clientRack: String): ConsumerSettings[F, K, V] = - withProperty(ConsumerConfig.CLIENT_RACK_CONFIG, clientRack) - - override def withProperty(key: String, value: String): ConsumerSettings[F, K, V] = - copy(properties = properties.updated(key, value)) - - override def withProperties(properties: (String, String)*): ConsumerSettings[F, K, V] = - copy(properties = this.properties ++ properties.toMap) - override def withProperties(properties: Map[String, String]): ConsumerSettings[F, K, V] = copy(properties = this.properties ++ properties) @@ -520,11 +409,6 @@ object ConsumerSettings { override def withMaxPrefetchBatches(maxPrefetchBatches: Int): ConsumerSettings[F, K, V] = copy(maxPrefetchBatches = Math.max(2, maxPrefetchBatches)) - override def withCredentials( - credentialsStore: KafkaCredentialStore - ): ConsumerSettings[F, K, V] = - withProperties(credentialsStore.properties) - override def toString: String = s"ConsumerSettings(closeTimeout = $closeTimeout, commitTimeout = $commitTimeout, pollInterval = $pollInterval, pollTimeout = $pollTimeout, commitRecovery = $commitRecovery)" } diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaClientSettings.scala b/modules/core/src/main/scala/fs2/kafka/KafkaClientSettings.scala new file mode 100644 index 000000000..c11bd8faf --- /dev/null +++ b/modules/core/src/main/scala/fs2/kafka/KafkaClientSettings.scala @@ -0,0 +1,111 @@ +package fs2.kafka + +import fs2.kafka.security.KafkaCredentialStore +import org.apache.kafka.clients.CommonClientConfigs + +import scala.concurrent.duration._ + +/** + * Settings common to producers, consumers, and admin clients. Implemented by + * [[ConsumerSettings]], [[ProducerSettings]], and [[AdminClientSettings]]. + */ +private[kafka] trait KafkaClientSettings[Self <: KafkaClientSettings[Self]] { self => + + /** + * Properties which can be provided when creating a Java `KafkaProducer` + * instance. Numerous functions in `Settings` classes add properties + * here if the settings are used by the Java Kakfa client. + */ + def properties: Map[String, String] + + /** + * Includes a property with the specified `key` and `value`. + * The key should be one of the config keys for the underlying, + * Java client, and the value should be a valid choice for the key. + */ + final def withProperty(key: String, value: String): Self = + withProperties(properties.updated(key, value)) + + /** + * Includes the specified keys and values as properties. + * The keys should be among the config keys for the underlying, + * Java client, and the value should be a valid choice for the key. + */ + final def withProperties(properties: (String, String)*): Self = + withProperties(this.properties ++ properties.toMap) + + /** + * Includes the specified keys and values as properties. + * The keys should be among the config keys for the underlying, + * Java client, and the value should be a valid choice for the key. + */ + def withProperties(properties: Map[String, String]): Self + + /** + * Returns a new `Settings` instance with the specified + * bootstrap servers. This is equivalent to setting the following + * property using the [[withProperty]] function. + * + * {{{ + * CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + * }}} + */ + final def withBootstrapServers(bootstrapServers: String): Self = + withProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + + /** + * Returns a new `Settings` instance with the specified + * client id. This is equivalent to setting the following property + * using the [[withProperty]] function. + * + * {{{ + * ProducerConfig.CLIENT_ID_CONFIG + * }}} + */ + final def withClientId(clientId: String): Self = + withProperty(CommonClientConfigs.CLIENT_ID_CONFIG, clientId) + + /** + * Returns a new `Settings` instance with the specified + * client rack. This is equivalent to setting the following + * property using the [[withProperty]] function. + * + * {{{ + * CommonClientConfigs.CLIENT_RACK_CONFIG + * }}} + */ + final def withClientRack(clientRack: String): Self = + withProperty(CommonClientConfigs.CLIENT_RACK_CONFIG, clientRack) + + /** + * Includes the credentials properties from the provided [[KafkaCredentialStore]] + */ + final def withCredentials(credentialsStore: KafkaCredentialStore): Self = + withProperties(credentialsStore.properties) + + /** + * Returns a new `Settings` instance with the specified + * request timeout. This is equivalent to setting the following + * property using the [[withProperty]] function, except you can + * specify it with a `FiniteDuration` instead of a `String`. + * + * {{{ + * CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG + * }}} + */ + final def withRequestTimeout(requestTimeout: FiniteDuration): Self = + withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toMillis.toString) + + /** + * The time to wait for the Java client to shutdown.
+ *
+ * The default value is 60 seconds for [[ConsumerSettings]], 20 seconds for [[ProducerSettings]], + * and 2 seconds for [[AdminClientSettings]]. + */ + def closeTimeout: FiniteDuration + + /** + * Creates a new `Settings` instance with the specified [[closeTimeout]]. + */ + def withCloseTimeout(closeTimeout: FiniteDuration): Self +} diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala index fd7e30b14..e2b2d7a7d 100644 --- a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala @@ -7,7 +7,6 @@ package fs2.kafka import cats.{Applicative, Show} -import fs2.kafka.security.KafkaCredentialStore import org.apache.kafka.clients.producer.ProducerConfig import scala.concurrent.ExecutionContext @@ -26,7 +25,8 @@ import scala.concurrent.duration._ *
* Use `ProducerSettings#apply` to create a new instance. */ -sealed abstract class ProducerSettings[F[_], K, V] { +sealed abstract class ProducerSettings[F[_], K, V] + extends KafkaClientSettings[ProducerSettings[F, K, V]] { /** * The `Serializer` to use for serializing record keys. @@ -54,24 +54,6 @@ sealed abstract class ProducerSettings[F[_], K, V] { */ def withCustomBlockingContext(ec: ExecutionContext): ProducerSettings[F, K, V] - /** - * Properties which can be provided when creating a Java `KafkaProducer` - * instance. Numerous functions in [[ProducerSettings]] add properties - * here if the settings are used by the Java `KafkaProducer`. - */ - def properties: Map[String, String] - - /** - * Returns a new [[ProducerSettings]] instance with the specified - * bootstrap servers. This is equivalent to setting the following - * property using the [[withProperty]] function. - * - * {{{ - * ProducerConfig.BOOTSTRAP_SERVERS_CONFIG - * }}} - */ - def withBootstrapServers(bootstrapServers: String): ProducerSettings[F, K, V] - /** * Returns a new [[ProducerSettings]] instance with the specified * acknowledgements. This is equivalent to setting the following @@ -96,17 +78,6 @@ sealed abstract class ProducerSettings[F[_], K, V] { */ def withBatchSize(batchSize: Int): ProducerSettings[F, K, V] - /** - * Returns a new [[ProducerSettings]] instance with the specified - * client id. This is equivalent to setting the following property - * using the [[withProperty]] function. - * - * {{{ - * ProducerConfig.CLIENT_ID_CONFIG - * }}} - */ - def withClientId(clientId: String): ProducerSettings[F, K, V] - /** * Returns a new [[ProducerSettings]] instance with the specified * retries. This is equivalent to setting the following property @@ -158,18 +129,6 @@ sealed abstract class ProducerSettings[F[_], K, V] { */ def withLinger(linger: FiniteDuration): ProducerSettings[F, K, V] - /** - * Returns a new [[ProducerSettings]] instance with the specified - * request timeout. This is equivalent to setting the following - * property using the [[withProperty]] function, except you can - * specify it with a `FiniteDuration` instead of a `String`. - * - * {{{ - * ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG - * }}} - */ - def withRequestTimeout(requestTimeout: FiniteDuration): ProducerSettings[F, K, V] - /** * Returns a new [[ProducerSettings]] instance with the specified * delivery timeout. This is equivalent to setting the following @@ -182,39 +141,6 @@ sealed abstract class ProducerSettings[F[_], K, V] { */ def withDeliveryTimeout(deliveryTimeout: FiniteDuration): ProducerSettings[F, K, V] - /** - * Includes a property with the specified `key` and `value`. - * The key should be one of the keys in `ProducerConfig`, - * and the value should be a valid choice for the key. - */ - def withProperty(key: String, value: String): ProducerSettings[F, K, V] - - /** - * Includes the specified keys and values as properties. The - * keys should be part of the `ProducerConfig` keys, and - * the values should be valid choices for the keys. - */ - def withProperties(properties: (String, String)*): ProducerSettings[F, K, V] - - /** - * Includes the specified keys and values as properties. The - * keys should be part of the `ProducerConfig` keys, and - * the values should be valid choices for the keys. - */ - def withProperties(properties: Map[String, String]): ProducerSettings[F, K, V] - - /** - * The time to wait for the Java `KafkaProducer` to shutdown.
- *
- * The default value is 60 seconds. - */ - def closeTimeout: FiniteDuration - - /** - * Creates a new [[ProducerSettings]] with the specified [[closeTimeout]]. - */ - def withCloseTimeout(closeTimeout: FiniteDuration): ProducerSettings[F, K, V] - /** * The maximum number of [[ProducerRecords]] to produce in the same batch.
*
@@ -226,11 +152,6 @@ sealed abstract class ProducerSettings[F[_], K, V] { * Creates a new [[ProducerSettings]] with the specified [[parallelism]]. */ def withParallelism(parallelism: Int): ProducerSettings[F, K, V] - - /** - * Includes the credentials properties from the provided [[KafkaCredentialStore]] - */ - def withCredentials(credentialsStore: KafkaCredentialStore): ProducerSettings[F, K, V] } object ProducerSettings { @@ -245,9 +166,6 @@ object ProducerSettings { override def withCustomBlockingContext(ec: ExecutionContext): ProducerSettings[F, K, V] = copy(customBlockingContext = Some(ec)) - override def withBootstrapServers(bootstrapServers: String): ProducerSettings[F, K, V] = - withProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) - override def withAcks(acks: Acks): ProducerSettings[F, K, V] = withProperty(ProducerConfig.ACKS_CONFIG, acks match { case Acks.ZeroAcks => "0" @@ -258,9 +176,6 @@ object ProducerSettings { override def withBatchSize(batchSize: Int): ProducerSettings[F, K, V] = withProperty(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString) - override def withClientId(clientId: String): ProducerSettings[F, K, V] = - withProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId) - override def withRetries(retries: Int): ProducerSettings[F, K, V] = withProperty(ProducerConfig.RETRIES_CONFIG, retries.toString) @@ -278,18 +193,9 @@ object ProducerSettings { override def withLinger(linger: FiniteDuration): ProducerSettings[F, K, V] = withProperty(ProducerConfig.LINGER_MS_CONFIG, linger.toMillis.toString) - override def withRequestTimeout(requestTimeout: FiniteDuration): ProducerSettings[F, K, V] = - withProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toMillis.toString) - override def withDeliveryTimeout(deliveryTimeout: FiniteDuration): ProducerSettings[F, K, V] = withProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeout.toMillis.toString) - override def withProperty(key: String, value: String): ProducerSettings[F, K, V] = - copy(properties = properties.updated(key, value)) - - override def withProperties(properties: (String, String)*): ProducerSettings[F, K, V] = - copy(properties = this.properties ++ properties.toMap) - override def withProperties(properties: Map[String, String]): ProducerSettings[F, K, V] = copy(properties = this.properties ++ properties) @@ -299,14 +205,6 @@ object ProducerSettings { override def withParallelism(parallelism: Int): ProducerSettings[F, K, V] = copy(parallelism = parallelism) - /** - * Includes the credentials properties from the provided [[KafkaCredentialStore]] - */ - override def withCredentials( - credentialsStore: KafkaCredentialStore - ): ProducerSettings[F, K, V] = - withProperties(credentialsStore.properties) - override def toString: String = s"ProducerSettings(closeTimeout = $closeTimeout)" }