Skip to content

Commit

Permalink
dekaf: Enable tuning connection pool size
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Oct 4, 2024
1 parent d55ecee commit 02db746
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
14 changes: 12 additions & 2 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,19 +345,29 @@ impl KafkaApiClient {
return Ok(client.clone());
}

let new_client = Self::connect(broker_url, self.sasl_config.clone()).await?;
let new_client = Self::connect(
broker_url,
self.sasl_config.clone(),
self.pool.status().max_size,
)
.await?;

clients.insert(broker_url.to_owned(), new_client.clone());

Ok(new_client)
}

#[instrument(name = "api_client_connect", skip(sasl_config))]
pub async fn connect(broker_url: &str, sasl_config: Arc<SASLConfig>) -> anyhow::Result<Self> {
pub async fn connect(
broker_url: &str,
sasl_config: Arc<SASLConfig>,
pool_size: usize,
) -> anyhow::Result<Self> {
let pool = Pool::builder(KafkaConnectionParams {
broker_url: broker_url.to_owned(),
sasl_config: sasl_config.clone(),
})
.max_size(pool_size)
.build()?;

// Close idle connections, and any free connection older than 30m.
Expand Down
6 changes: 6 additions & 0 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ pub struct Cli {
#[arg(long, env = "ENCRYPTION_SECRET")]
encryption_secret: String,

/// The maximum number of connections to a particular upstream kafka broker that can be
/// open at any one time. These connections will be pooled and shared between all requests.
#[arg(long, env = "BROKER_CONNECTION_POOL_SIZE", default_value = "20")]
broker_connection_pool_size: usize,

#[command(flatten)]
tls: Option<TlsArgs>,
}
Expand Down Expand Up @@ -129,6 +134,7 @@ async fn main() -> anyhow::Result<()> {
cli.default_broker_username,
cli.default_broker_password,
)?,
cli.broker_connection_pool_size
).await.context(
"failed to connect or authenticate to upstream Kafka broker used for serving group management APIs",
)?,
Expand Down

0 comments on commit 02db746

Please sign in to comment.