diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemConfig.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemConfig.java index acd01a75e5da1..32b95300e4c82 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemConfig.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemConfig.java @@ -14,8 +14,10 @@ package io.trino.filesystem.azure; import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; import io.airlift.units.DataSize; import io.airlift.units.DataSize.Unit; +import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; @@ -34,6 +36,7 @@ public enum AuthType private DataSize writeBlockSize = DataSize.of(4, Unit.MEGABYTE); private int maxWriteConcurrency = 8; private DataSize maxSingleUploadSize = DataSize.of(4, Unit.MEGABYTE); + private Integer maxHttpRequests = 2 * Runtime.getRuntime().availableProcessors(); @NotNull public AuthType getAuthType() @@ -111,4 +114,18 @@ public AzureFileSystemConfig setMaxSingleUploadSize(DataSize maxSingleUploadSize this.maxSingleUploadSize = maxSingleUploadSize; return this; } + + @Min(1) + public int getMaxHttpRequests() + { + return maxHttpRequests; + } + + @Config("azure.max-http-requests") + @ConfigDescription("Maximum number of concurrent HTTP requests to Azure on every node") + public AzureFileSystemConfig setMaxHttpRequests(int maxHttpRequests) + { + this.maxHttpRequests = maxHttpRequests; + return this; + } } diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java index 2f00b9ca94232..e7597051c7e18 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java @@ -56,7 +56,8 @@ public AzureFileSystemFactory(OpenTelemetry openTelemetry, AzureAuth azureAuth, config.getReadBlockSize(), config.getWriteBlockSize(), config.getMaxWriteConcurrency(), - config.getMaxSingleUploadSize()); + config.getMaxSingleUploadSize(), + config.getMaxHttpRequests()); } public AzureFileSystemFactory( @@ -66,7 +67,8 @@ public AzureFileSystemFactory( DataSize readBlockSize, DataSize writeBlockSize, int maxWriteConcurrency, - DataSize maxSingleUploadSize) + DataSize maxSingleUploadSize, + int maxHttpRequests) { this.auth = requireNonNull(azureAuth, "azureAuth is null"); this.endpoint = requireNonNull(endpoint, "endpoint is null"); @@ -77,7 +79,12 @@ public AzureFileSystemFactory( this.maxSingleUploadSize = requireNonNull(maxSingleUploadSize, "maxSingleUploadSize is null"); this.tracingOptions = new OpenTelemetryTracingOptions().setOpenTelemetry(openTelemetry); - okHttpClient = new OkHttpClient.Builder().build(); + Dispatcher dispatcher = new Dispatcher(); + dispatcher.setMaxRequests(maxHttpRequests); + dispatcher.setMaxRequestsPerHost(maxHttpRequests); + okHttpClient = new OkHttpClient.Builder() + .dispatcher(dispatcher) + .build(); HttpClientOptions clientOptions = new HttpClientOptions(); clientOptions.setTracingOptions(tracingOptions); httpClient = createAzureHttpClient(okHttpClient, clientOptions); @@ -101,9 +108,6 @@ public static HttpClient createAzureHttpClient(OkHttpClient okHttpClient, HttpCl Integer poolSize = clientOptions.getMaximumConnectionPoolSize(); // By default, OkHttp uses a maximum idle connection count of 5. int maximumConnectionPoolSize = (poolSize != null && poolSize > 0) ? poolSize : 5; - Dispatcher dispatcher = new Dispatcher(); - dispatcher.setMaxRequests(Runtime.getRuntime().availableProcessors() * 4); - dispatcher.setMaxRequestsPerHost(Runtime.getRuntime().availableProcessors() * 2); return new OkHttpAsyncHttpClientBuilder(okHttpClient) .proxy(clientOptions.getProxyOptions()) @@ -113,7 +117,6 @@ public static HttpClient createAzureHttpClient(OkHttpClient okHttpClient, HttpCl .readTimeout(clientOptions.getReadTimeout()) .connectionPool(new ConnectionPool(maximumConnectionPoolSize, clientOptions.getConnectionIdleTimeout().toMillis(), TimeUnit.MILLISECONDS)) - .dispatcher(dispatcher) .build(); } } diff --git a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemConfig.java b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemConfig.java index c45b09e75c07f..ad4299dc3c542 100644 --- a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemConfig.java +++ b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemConfig.java @@ -36,7 +36,8 @@ void testDefaults() .setReadBlockSize(DataSize.of(4, Unit.MEGABYTE)) .setWriteBlockSize(DataSize.of(4, Unit.MEGABYTE)) .setMaxWriteConcurrency(8) - .setMaxSingleUploadSize(DataSize.of(4, Unit.MEGABYTE))); + .setMaxSingleUploadSize(DataSize.of(4, Unit.MEGABYTE)) + .setMaxHttpRequests(2 * Runtime.getRuntime().availableProcessors())); } @Test @@ -49,6 +50,7 @@ public void testExplicitPropertyMappings() .put("azure.write-block-size", "5MB") .put("azure.max-write-concurrency", "7") .put("azure.max-single-upload-size", "7MB") + .put("azure.max-http-requests", "128") .buildOrThrow(); AzureFileSystemConfig expected = new AzureFileSystemConfig() @@ -57,7 +59,8 @@ public void testExplicitPropertyMappings() .setReadBlockSize(DataSize.of(3, Unit.MEGABYTE)) .setWriteBlockSize(DataSize.of(5, Unit.MEGABYTE)) .setMaxWriteConcurrency(7) - .setMaxSingleUploadSize(DataSize.of(7, Unit.MEGABYTE)); + .setMaxSingleUploadSize(DataSize.of(7, Unit.MEGABYTE)) + .setMaxHttpRequests(128); assertFullMapping(properties, expected); }