diff --git a/here-naksha-storage-http/src/main/java/com/here/naksha/storage/http/HttpStorage.java b/here-naksha-storage-http/src/main/java/com/here/naksha/storage/http/HttpStorage.java index d5ed4fbfc..2665c7e33 100644 --- a/here-naksha-storage-http/src/main/java/com/here/naksha/storage/http/HttpStorage.java +++ b/here-naksha-storage-http/src/main/java/com/here/naksha/storage/http/HttpStorage.java @@ -18,14 +18,15 @@ */ package com.here.naksha.storage.http; +import static com.here.naksha.storage.http.RequestSender.KeyProperties; + import com.here.naksha.lib.core.NakshaContext; import com.here.naksha.lib.core.lambdas.Fe1; import com.here.naksha.lib.core.models.naksha.Storage; import com.here.naksha.lib.core.storage.IReadSession; import com.here.naksha.lib.core.storage.IStorage; import com.here.naksha.lib.core.util.json.JsonSerializable; -import java.net.http.HttpClient; -import java.time.Duration; +import com.here.naksha.storage.http.cache.RequestSenderCache; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import org.jetbrains.annotations.NotNull; @@ -41,15 +42,13 @@ public class HttpStorage implements IStorage { public HttpStorage(@NotNull Storage storage) { HttpStorageProperties properties = HttpStorage.getProperties(storage); - HttpClient httpStorageClient = HttpClient.newBuilder() - .connectTimeout(Duration.ofSeconds(properties.getConnectTimeout())) - .build(); - requestSender = new RequestSender( - storage.getId(), - properties.getUrl(), - properties.getHeaders(), - httpStorageClient, - properties.getSocketTimeout()); + requestSender = RequestSenderCache.getInstance() + .getSenderWith(new KeyProperties( + storage.getId(), + properties.getUrl(), + properties.getHeaders(), + properties.getConnectTimeout(), + properties.getSocketTimeout())); } @Override diff --git a/here-naksha-storage-http/src/main/java/com/here/naksha/storage/http/RequestSender.java b/here-naksha-storage-http/src/main/java/com/here/naksha/storage/http/RequestSender.java index 7515a6712..7c3bd0fb4 100644 --- a/here-naksha-storage-http/src/main/java/com/here/naksha/storage/http/RequestSender.java +++ b/here-naksha-storage-http/src/main/java/com/here/naksha/storage/http/RequestSender.java @@ -34,26 +34,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class RequestSender { +public class RequestSender { private static final Logger log = LoggerFactory.getLogger(RequestSender.class); - private final String name; - private final String hostUrl; - private final Map defaultHeaders; + + @NotNull private final HttpClient httpClient; - private final long socketTimeoutSec; - public RequestSender( - final String name, - String hostUrl, - Map defaultHeaders, - HttpClient httpClient, - long socketTimeoutSec) { - this.name = name; - this.httpClient = httpClient; - this.hostUrl = hostUrl; - this.defaultHeaders = defaultHeaders; - this.socketTimeoutSec = socketTimeoutSec; + @NotNull + private final RequestSender.KeyProperties keyProps; + + public RequestSender(@NotNull RequestSender.KeyProperties keyProps) { + this.keyProps = keyProps; + this.httpClient = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(keyProps.connectionTimeoutSec)) + .build(); } /** @@ -71,10 +66,10 @@ HttpResponse sendRequest( @Nullable Map headers, @Nullable String httpMethod, @Nullable String body) { - URI uri = URI.create(hostUrl + endpoint); - HttpRequest.Builder builder = newBuilder().uri(uri).timeout(Duration.ofSeconds(socketTimeoutSec)); + URI uri = URI.create(keyProps.hostUrl + endpoint); + HttpRequest.Builder builder = newBuilder().uri(uri).timeout(Duration.ofSeconds(keyProps.socketTimeoutSec)); - if (keepDefHeaders) defaultHeaders.forEach(builder::header); + if (keepDefHeaders) keyProps.defaultHeaders.forEach(builder::header); if (headers != null) headers.forEach(builder::header); HttpRequest.BodyPublisher bodyPublisher = @@ -91,7 +86,7 @@ private HttpResponse sendRequest(HttpRequest request) { try { CompletableFuture> futureResponse = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()); - response = futureResponse.get(socketTimeoutSec, TimeUnit.SECONDS); + response = futureResponse.get(keyProps.socketTimeoutSec, TimeUnit.SECONDS); return response; } catch (Exception e) { log.warn("We got exception while executing Http request against remote server.", e); @@ -101,8 +96,8 @@ private HttpResponse sendRequest(HttpRequest request) { log.info( "[Storage API stats => type,storageId,host,method,path,status,timeTakenMs,resSize] - StorageAPIStats {} {} {} {} {} {} {} {}", "HttpStorage", - this.name, - this.hostUrl, + keyProps.name, + keyProps.hostUrl, request.method(), request.uri(), (response == null) ? "-" : response.statusCode(), @@ -110,4 +105,20 @@ private HttpResponse sendRequest(HttpRequest request) { (response == null) ? 0 : response.body().length()); } } + + public boolean hasKeyProps(KeyProperties thatKeyProps) { + return this.keyProps.equals(thatKeyProps); + } + + /** + * Set of properties that are just enough to construct the sender + * and distinguish unambiguously between objects + * in terms of their effective configuration + */ + public record KeyProperties( + @NotNull String name, + @NotNull String hostUrl, + @NotNull Map defaultHeaders, + long connectionTimeoutSec, + long socketTimeoutSec) {} } diff --git a/here-naksha-storage-http/src/main/java/com/here/naksha/storage/http/cache/RequestSenderCache.java b/here-naksha-storage-http/src/main/java/com/here/naksha/storage/http/cache/RequestSenderCache.java new file mode 100644 index 000000000..cfe2e2f30 --- /dev/null +++ b/here-naksha-storage-http/src/main/java/com/here/naksha/storage/http/cache/RequestSenderCache.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2017-2023 HERE Europe B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * License-Filename: LICENSE + */ +package com.here.naksha.storage.http.cache; + +import static com.here.naksha.storage.http.RequestSender.KeyProperties; + +import com.here.naksha.storage.http.RequestSender; +import java.util.concurrent.*; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class RequestSenderCache { + + public static final int CLEANER_PERIOD_HOURS = 8; + private final ConcurrentMap requestSenders; + + private RequestSenderCache() { + this(new ConcurrentHashMap<>(), CLEANER_PERIOD_HOURS, TimeUnit.HOURS); + } + + RequestSenderCache(ConcurrentMap requestSenders, int cleanPeriod, TimeUnit cleanPeriodUnit) { + this.requestSenders = requestSenders; + Executors.newSingleThreadScheduledExecutor() + .scheduleAtFixedRate(requestSenders::clear, cleanPeriod, cleanPeriod, cleanPeriodUnit); + } + + @NotNull + public static RequestSenderCache getInstance() { + return InstanceHolder.instance; + } + + @NotNull + public RequestSender getSenderWith(KeyProperties keyProperties) { + return requestSenders.compute( + keyProperties.name(), (__, cachedSender) -> getUpdated(cachedSender, keyProperties)); + } + + private @NotNull RequestSender getUpdated( + @Nullable RequestSender cachedSender, @NotNull KeyProperties keyProperties) { + if (cachedSender != null && cachedSender.hasKeyProps(keyProperties)) return cachedSender; + else return new RequestSender(keyProperties); + } + + private static final class InstanceHolder { + private static final RequestSenderCache instance = new RequestSenderCache(); + } +} diff --git a/here-naksha-storage-http/src/test/java/com/here/naksha/storage/http/cache/RequestSenderCacheTest.java b/here-naksha-storage-http/src/test/java/com/here/naksha/storage/http/cache/RequestSenderCacheTest.java new file mode 100644 index 000000000..4d912b07d --- /dev/null +++ b/here-naksha-storage-http/src/test/java/com/here/naksha/storage/http/cache/RequestSenderCacheTest.java @@ -0,0 +1,252 @@ +package com.here.naksha.storage.http.cache; + +import com.here.naksha.storage.http.RequestSender; +import com.here.naksha.storage.http.RequestSender.KeyProperties; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.*; + +class RequestSenderCacheTest { + + public static final String EXAMPLE_URL = "www.example.naksha.com"; + + public static final int EXAMPLE_CONNECTION_TIMEOUT = 1; + + public static final int EXAMPLE_SOCKET_TIMEOUT = 1; + + public static final Map EXAMPLE_HEADERS = Map.of("Authorization", "Bearer exampleToken", "Content-Type", "application/json"); + public static final Map MODIFIED_HEADERS = Map.of("Authorization", "Bearer modifiedToken", "Content-Type", "application/json"); + + public static final String ID_1 = "id_1"; + public static final String ID_2 = "id_2"; + public static final String ID_3 = "id_3"; + public static final KeyProperties PROP_ID_1 = new KeyProperties( + ID_1, + EXAMPLE_URL, + EXAMPLE_HEADERS, + EXAMPLE_CONNECTION_TIMEOUT, + EXAMPLE_SOCKET_TIMEOUT + ); + + public static final KeyProperties PROP_ID_1_COPY = new KeyProperties( + ID_1, + EXAMPLE_URL, + EXAMPLE_HEADERS, + EXAMPLE_CONNECTION_TIMEOUT, + EXAMPLE_SOCKET_TIMEOUT + ); + + public static final KeyProperties PROP_ID_1_INT_CHANGED = new KeyProperties( + ID_1, + EXAMPLE_URL, + EXAMPLE_HEADERS, + EXAMPLE_CONNECTION_TIMEOUT, + 2 + ); + + public static final KeyProperties PROP_ID_1_MAP_COPIED = new KeyProperties( + ID_1, + EXAMPLE_URL, + Map.copyOf(EXAMPLE_HEADERS), + EXAMPLE_CONNECTION_TIMEOUT, + EXAMPLE_SOCKET_TIMEOUT + ); + + public static final KeyProperties PROP_ID_1_MAP_CHANGED = new KeyProperties( + ID_1, + EXAMPLE_URL, + MODIFIED_HEADERS, + EXAMPLE_CONNECTION_TIMEOUT, + EXAMPLE_SOCKET_TIMEOUT + ); + + public static final KeyProperties PROP_ID_2 = new KeyProperties( + ID_2, + EXAMPLE_URL, + EXAMPLE_HEADERS, + EXAMPLE_CONNECTION_TIMEOUT, + EXAMPLE_SOCKET_TIMEOUT + ); + + public static final KeyProperties PROP_ID_3 = new KeyProperties( + ID_3, + EXAMPLE_URL, + EXAMPLE_HEADERS, + EXAMPLE_CONNECTION_TIMEOUT, + EXAMPLE_SOCKET_TIMEOUT + ); + + + @Test + void testOneId() { + // Setup + ConcurrentMap senders = new ConcurrentHashMap<>(); + RequestSenderCache cache = new RequestSenderCache( + senders, + 8, + TimeUnit.HOURS + ); + + assertEquals(PROP_ID_1, PROP_ID_1_COPY); + assertNotSame(PROP_ID_1, PROP_ID_1_COPY); + + // Tests + assertEquals(0, senders.size()); + + RequestSender senderId1 = cache.getSenderWith(PROP_ID_1); + assertEquals(1, senders.size()); + + RequestSender senderId1Copy = cache.getSenderWith(PROP_ID_1_COPY); + assertEquals(1, senders.size()); + assertSame(senderId1, senderId1Copy); + + RequestSender senderId1IntChanged + = cache.getSenderWith(PROP_ID_1_INT_CHANGED); + assertEquals(1, senders.size()); + assertNotEquals(senderId1Copy, senderId1IntChanged); + } + + @Test + void testOneIdMapChange() { + // Setup + ConcurrentMap senders = new ConcurrentHashMap<>(); + RequestSenderCache cache = new RequestSenderCache( + senders, + 8, + TimeUnit.HOURS + ); + + assertEquals(PROP_ID_1, PROP_ID_1_MAP_COPIED); + assertNotSame(PROP_ID_1, PROP_ID_1_MAP_COPIED); + + // Tests + assertEquals(0, senders.size()); + + RequestSender senderId1 = cache.getSenderWith(PROP_ID_1); + assertEquals(1, senders.size()); + + RequestSender senderId1MapCopied = cache.getSenderWith(PROP_ID_1_MAP_COPIED); + assertEquals(1, senders.size()); + assertSame(senderId1, senderId1MapCopied); + + RequestSender senderId1MapChanged + = cache.getSenderWith(PROP_ID_1_MAP_CHANGED); + assertEquals(1, senders.size()); + assertNotEquals(senderId1MapCopied, senderId1MapChanged); + } + + @Test + void testMoreIds() { + // Setup + ConcurrentMap senders = new ConcurrentHashMap<>(); + RequestSenderCache cache = new RequestSenderCache( + senders, + 8, + TimeUnit.HOURS + ); + + // Tests + assertEquals(0, senders.size()); + + RequestSender senderId1 = cache.getSenderWith(PROP_ID_1); + assertEquals(1, senders.size()); + + RequestSender senderId2 = cache.getSenderWith(PROP_ID_2); + assertEquals(2, senders.size()); + assertNotEquals(senderId1, senderId2); + + RequestSender senderId3 = cache.getSenderWith(PROP_ID_3); + assertEquals(3, senders.size()); + assertNotEquals(senderId1, senderId3); + + RequestSender newSenderId1 = cache.getSenderWith(PROP_ID_1); + assertEquals(3, senders.size()); + assertSame(senderId1, newSenderId1); + } + + @Test + void testCleanup() throws InterruptedException { + // Setup + int cleanPeriodMs = 1000; + ConcurrentMap senders = new ConcurrentHashMap<>(); + RequestSenderCache cache = new RequestSenderCache( + senders, + cleanPeriodMs, + TimeUnit.MILLISECONDS + ); + + // Tests + assertEquals(0, senders.size()); + cache.getSenderWith(PROP_ID_1); + cache.getSenderWith(PROP_ID_2); + cache.getSenderWith(PROP_ID_3); + assertEquals(3, senders.size()); + Thread.sleep(cleanPeriodMs + 100); + assertEquals(0, senders.size()); + cache.getSenderWith(PROP_ID_1); + assertEquals(1, senders.size()); + } + + + @RepeatedTest(10) + void testCleanupConcurrency() throws InterruptedException { + // Setup + int cleanPeriodMs = 1; + ConcurrentMap senders = new ConcurrentHashMap<>(); + RequestSenderCache cache = new RequestSenderCache( + senders, + cleanPeriodMs, + TimeUnit.MILLISECONDS + ); + + // Tests + assertEquals(0, senders.size()); + + assertTrue(cache.getSenderWith(PROP_ID_1).hasKeyProps(PROP_ID_1)); + assertTrue(cache.getSenderWith(PROP_ID_2).hasKeyProps(PROP_ID_2)); + assertTrue(cache.getSenderWith(PROP_ID_3).hasKeyProps(PROP_ID_3)); + Thread.sleep(cleanPeriodMs + 100); + assertEquals(0, senders.size()); + } + + @RepeatedTest(10) + void testGetSenderConcurrency() throws InterruptedException { + // Setup + ConcurrentMap senders = new ConcurrentHashMap<>(); + RequestSenderCache cache = new RequestSenderCache( + senders, + 8, + TimeUnit.HOURS + ); + AtomicReference senderId1 = new AtomicReference<>(); + AtomicReference senderId1Copy = new AtomicReference<>(); + AtomicReference senderId1IntChanged = new AtomicReference<>(); + AtomicReference senderId1MapChanged = new AtomicReference<>(); + + // Tests + List threads = List.of( + new Thread(() -> senderId1.set(cache.getSenderWith(PROP_ID_1))), + new Thread(() -> senderId1Copy.set(cache.getSenderWith(PROP_ID_1_COPY))), + new Thread(() -> senderId1IntChanged.set(cache.getSenderWith(PROP_ID_1_INT_CHANGED))), + new Thread(() -> senderId1MapChanged.set(cache.getSenderWith(PROP_ID_1_MAP_CHANGED))) + ); + threads.forEach(Thread::start); + for (Thread thread : threads) { + thread.join(); + } + + assertTrue(senderId1.get().hasKeyProps(PROP_ID_1)); + assertTrue(senderId1Copy.get().hasKeyProps(PROP_ID_1_COPY)); + assertTrue(senderId1IntChanged.get().hasKeyProps(PROP_ID_1_INT_CHANGED)); + assertTrue(senderId1MapChanged.get().hasKeyProps(PROP_ID_1_MAP_CHANGED)); + } + +} \ No newline at end of file