Skip to content

Commit

Permalink
MCPODS-7035 Ffw storage http client reuse (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamczyk-HERE authored Mar 14, 2024
1 parent 91fd92b commit ea8970d
Show file tree
Hide file tree
Showing 4 changed files with 358 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
*/
package com.here.naksha.storage.http;

import static com.here.naksha.storage.http.RequestSender.KeyProperties;

import com.here.naksha.lib.core.NakshaContext;
import com.here.naksha.lib.core.lambdas.Fe1;
import com.here.naksha.lib.core.models.naksha.Storage;
import com.here.naksha.lib.core.storage.IReadSession;
import com.here.naksha.lib.core.storage.IStorage;
import com.here.naksha.lib.core.util.json.JsonSerializable;
import java.net.http.HttpClient;
import java.time.Duration;
import com.here.naksha.storage.http.cache.RequestSenderCache;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.jetbrains.annotations.NotNull;
Expand All @@ -41,15 +42,13 @@ public class HttpStorage implements IStorage {

public HttpStorage(@NotNull Storage storage) {
HttpStorageProperties properties = HttpStorage.getProperties(storage);
HttpClient httpStorageClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(properties.getConnectTimeout()))
.build();
requestSender = new RequestSender(
storage.getId(),
properties.getUrl(),
properties.getHeaders(),
httpStorageClient,
properties.getSocketTimeout());
requestSender = RequestSenderCache.getInstance()
.getSenderWith(new KeyProperties(
storage.getId(),
properties.getUrl(),
properties.getHeaders(),
properties.getConnectTimeout(),
properties.getSocketTimeout()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RequestSender {
public class RequestSender {

private static final Logger log = LoggerFactory.getLogger(RequestSender.class);
private final String name;
private final String hostUrl;
private final Map<String, String> defaultHeaders;

@NotNull
private final HttpClient httpClient;
private final long socketTimeoutSec;

public RequestSender(
final String name,
String hostUrl,
Map<String, String> defaultHeaders,
HttpClient httpClient,
long socketTimeoutSec) {
this.name = name;
this.httpClient = httpClient;
this.hostUrl = hostUrl;
this.defaultHeaders = defaultHeaders;
this.socketTimeoutSec = socketTimeoutSec;
@NotNull
private final RequestSender.KeyProperties keyProps;

public RequestSender(@NotNull RequestSender.KeyProperties keyProps) {
this.keyProps = keyProps;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(keyProps.connectionTimeoutSec))
.build();
}

/**
Expand All @@ -71,10 +66,10 @@ HttpResponse<String> sendRequest(
@Nullable Map<String, String> headers,
@Nullable String httpMethod,
@Nullable String body) {
URI uri = URI.create(hostUrl + endpoint);
HttpRequest.Builder builder = newBuilder().uri(uri).timeout(Duration.ofSeconds(socketTimeoutSec));
URI uri = URI.create(keyProps.hostUrl + endpoint);
HttpRequest.Builder builder = newBuilder().uri(uri).timeout(Duration.ofSeconds(keyProps.socketTimeoutSec));

if (keepDefHeaders) defaultHeaders.forEach(builder::header);
if (keepDefHeaders) keyProps.defaultHeaders.forEach(builder::header);
if (headers != null) headers.forEach(builder::header);

HttpRequest.BodyPublisher bodyPublisher =
Expand All @@ -91,7 +86,7 @@ private HttpResponse<String> sendRequest(HttpRequest request) {
try {
CompletableFuture<HttpResponse<String>> futureResponse =
httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
response = futureResponse.get(socketTimeoutSec, TimeUnit.SECONDS);
response = futureResponse.get(keyProps.socketTimeoutSec, TimeUnit.SECONDS);
return response;
} catch (Exception e) {
log.warn("We got exception while executing Http request against remote server.", e);
Expand All @@ -101,13 +96,29 @@ private HttpResponse<String> sendRequest(HttpRequest request) {
log.info(
"[Storage API stats => type,storageId,host,method,path,status,timeTakenMs,resSize] - StorageAPIStats {} {} {} {} {} {} {} {}",
"HttpStorage",
this.name,
this.hostUrl,
keyProps.name,
keyProps.hostUrl,
request.method(),
request.uri(),
(response == null) ? "-" : response.statusCode(),
executionTime,
(response == null) ? 0 : response.body().length());
}
}

public boolean hasKeyProps(KeyProperties thatKeyProps) {
return this.keyProps.equals(thatKeyProps);
}

/**
* Set of properties that are just enough to construct the sender
* and distinguish unambiguously between objects
* in terms of their effective configuration
*/
public record KeyProperties(
@NotNull String name,
@NotNull String hostUrl,
@NotNull Map<String, String> defaultHeaders,
long connectionTimeoutSec,
long socketTimeoutSec) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (C) 2017-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/
package com.here.naksha.storage.http.cache;

import static com.here.naksha.storage.http.RequestSender.KeyProperties;

import com.here.naksha.storage.http.RequestSender;
import java.util.concurrent.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class RequestSenderCache {

public static final int CLEANER_PERIOD_HOURS = 8;
private final ConcurrentMap<String, RequestSender> requestSenders;

private RequestSenderCache() {
this(new ConcurrentHashMap<>(), CLEANER_PERIOD_HOURS, TimeUnit.HOURS);
}

RequestSenderCache(ConcurrentMap<String, RequestSender> requestSenders, int cleanPeriod, TimeUnit cleanPeriodUnit) {
this.requestSenders = requestSenders;
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(requestSenders::clear, cleanPeriod, cleanPeriod, cleanPeriodUnit);
}

@NotNull
public static RequestSenderCache getInstance() {
return InstanceHolder.instance;
}

@NotNull
public RequestSender getSenderWith(KeyProperties keyProperties) {
return requestSenders.compute(
keyProperties.name(), (__, cachedSender) -> getUpdated(cachedSender, keyProperties));
}

private @NotNull RequestSender getUpdated(
@Nullable RequestSender cachedSender, @NotNull KeyProperties keyProperties) {
if (cachedSender != null && cachedSender.hasKeyProps(keyProperties)) return cachedSender;
else return new RequestSender(keyProperties);
}

private static final class InstanceHolder {
private static final RequestSenderCache instance = new RequestSenderCache();
}
}
Loading

0 comments on commit ea8970d

Please sign in to comment.