diff --git a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java
index 9daec796a3..8beaa12938 100644
--- a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java
+++ b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java
@@ -605,7 +605,7 @@ protected ProtocolAdapterCommandConsumerFactoryImpl commandConsumerFactory(final
LOG.debug("using Command Router service client, configuring CommandConsumerFactory [{}]",
ProtocolAdapterCommandConsumerFactoryImpl.class.getName());
- return new ProtocolAdapterCommandConsumerFactoryImpl(commandRouterClient, getComponentName());
+ return new ProtocolAdapterCommandConsumerFactoryImpl(vertx, commandRouterClient, getComponentName());
}
private ClientConfigProperties commandResponseSenderConfig() {
diff --git a/clients/command/pom.xml b/clients/command/pom.xml
index 4ec5233369..32b549b4b3 100644
--- a/clients/command/pom.xml
+++ b/clients/command/pom.xml
@@ -67,11 +67,26 @@
opentracing-noop
true
+
+ io.quarkus
+ quarkus-kubernetes-client
+
+
+ org.jboss.spec.javax.xml.bind
+ jboss-jaxb-api_2.3_spec
+
+
+
- ch.qos.logback
- logback-classic
+ io.fabric8
+ kubernetes-server-mock
+ test
+
+
+ io.quarkus
+ quarkus-junit5
test
diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/CommandRoutingUtil.java b/clients/command/src/main/java/org/eclipse/hono/client/command/CommandRoutingUtil.java
index eb2edb3725..c6268bddec 100644
--- a/clients/command/src/main/java/org/eclipse/hono/client/command/CommandRoutingUtil.java
+++ b/clients/command/src/main/java/org/eclipse/hono/client/command/CommandRoutingUtil.java
@@ -43,23 +43,25 @@ private CommandRoutingUtil() {
/**
* Creates a new adapter instance identifier, used for identifying the protocol adapter to route a command to.
*
- * If this method is invoked from within a docker container in a Kubernetes cluster, the format is
- * [prefix]_[docker_container_id]_[counter], with prefix being the name of the Kubernetes pod.
+ * If this application is running in a Kubernetes cluster and the container id is supplied as parameter here,
+ * the format of the returned id is [prefix]_[container_id]_[counter], with prefix being the name of
+ * the Kubernetes pod.
* See also {@link #getK8sPodNameAndContainerIdFromAdapterInstanceId(String)}.
*
* If not running in a Kubernetes cluster, a random id with the given adapter name as prefix is used.
*
* @param adapterName The adapter name.
+ * @param k8sContainerId The container id or {@code null} if not running in Kubernetes.
* @param counter The counter value to use.
* @return The new adapter instance identifier.
*/
- public static String getNewAdapterInstanceId(final String adapterName, final int counter) {
- final String k8sContainerId = CgroupV1KubernetesContainerUtil.getContainerId();
+ public static String getNewAdapterInstanceId(final String adapterName, final String k8sContainerId,
+ final int counter) {
if (k8sContainerId == null || k8sContainerId.length() < 12) {
return getNewAdapterInstanceIdForNonK8sEnv(adapterName);
} else {
- // running in Kubernetes: prefer HOSTNAME env var containing the pod name
- String prefix = System.getenv("HOSTNAME");
+ // running in Kubernetes: use pod name as prefix
+ String prefix = KubernetesContainerInfoProvider.getInstance().getPodName();
if (Strings.isNullOrEmpty(prefix)) {
prefix = adapterName;
}
diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/KubernetesContainerInfoProvider.java b/clients/command/src/main/java/org/eclipse/hono/client/command/KubernetesContainerInfoProvider.java
new file mode 100644
index 0000000000..83e32f58aa
--- /dev/null
+++ b/clients/command/src/main/java/org/eclipse/hono/client/command/KubernetesContainerInfoProvider.java
@@ -0,0 +1,229 @@
+/*******************************************************************************
+ * Copyright (c) 2023 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *******************************************************************************/
+
+package org.eclipse.hono.client.command;
+
+import java.net.HttpURLConnection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.eclipse.hono.client.ServerErrorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.fabric8.kubernetes.api.model.ContainerStatus;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.utils.PodStatusUtil;
+import io.vertx.core.Context;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+
+/**
+ * Provides information about the Kubernetes container if this application is running in Kubernetes.
+ */
+public class KubernetesContainerInfoProvider {
+
+ /**
+ * Name of the environment variable that contains the name of the container that this application is running in.
+ *
+ * Such an environment variable needs to be set to determine the container id if the pod that this application
+ * is running in contains multiple running containers.
+ */
+ public static final String KUBERNETES_CONTAINER_NAME_ENV_VAR = "KUBERNETES_CONTAINER_NAME";
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesContainerInfoProvider.class);
+
+ private static final KubernetesContainerInfoProvider INSTANCE = new KubernetesContainerInfoProvider();
+
+ private final AtomicReference> containerIdPromiseRef = new AtomicReference<>();
+
+ private String containerId;
+ private final String podName;
+ private final boolean isRunningInKubernetes;
+
+ private KubernetesContainerInfoProvider() {
+ isRunningInKubernetes = System.getenv("KUBERNETES_SERVICE_HOST") != null;
+ podName = isRunningInKubernetes ? System.getenv("HOSTNAME") : null;
+ }
+
+ /**
+ * Gets the KubernetesContainerInfoProvider instance.
+ *
+ * @return The KubernetesContainerInfoProvider.
+ */
+ public static KubernetesContainerInfoProvider getInstance() {
+ return INSTANCE;
+ }
+
+ boolean isRunningInKubernetes() {
+ return isRunningInKubernetes;
+ }
+
+ /**
+ * Gets the Kubernetes pod name if running in Kubernetes.
+ *
+ * @return The pod name or {@code null} if not running in Kubernetes.
+ */
+ public String getPodName() {
+ return podName;
+ }
+
+ /**
+ * Determines the container id if running in a container in Kubernetes.
+ *
+ * First an attempt is made to get the container id by inspecting /proc/self/cgroup
+ * (via {@link CgroupV1KubernetesContainerUtil#getContainerId()}).
+ * If not found there, the container id is queried via the Kubernetes API.
+ *
+ * NOTE: The service account of the application pod must have an RBAC role allowing "get" on the "pods" resource.
+ * If this application is running in a pod with multiple containers, the container that this application is running
+ * in must have an environment variable with the name specified in {@link #KUBERNETES_CONTAINER_NAME_ENV_VAR} set
+ * to the container name.
+ *
+ * @param context The vert.x context to run the code on to determine the container id via the K8s API.
+ * @return A future indicating the outcome of the operation.
+ * The future will be succeeded with the container id or {@code null} if not running in Kubernetes.
+ * In case of an error determining the container id, the future will be failed with either a
+ * {@link IllegalStateException} if a precondition for getting the id isn't fulfilled (because of missing
+ * permissions or because multiple pod containers exist and no KUBERNETES_CONTAINER_NAME env var is set),
+ * or otherwise a {@link ServerErrorException}.
+ * @throws NullPointerException if context is {@code null}.
+ */
+ public Future getContainerId(final Context context) {
+ Objects.requireNonNull(context);
+ if (containerId != null) {
+ return Future.succeededFuture(containerId);
+ }
+ if (!isRunningInKubernetes()) {
+ return Future.succeededFuture(null);
+ }
+ final String containerIdViaCgroup1 = CgroupV1KubernetesContainerUtil.getContainerId();
+ if (containerIdViaCgroup1 != null) {
+ containerId = containerIdViaCgroup1;
+ return Future.succeededFuture(containerId);
+ }
+ final Promise containerIdPromise = Promise.promise();
+ if (!containerIdPromiseRef.compareAndSet(null, containerIdPromise)) {
+ containerIdPromiseRef.get().future().onComplete(containerIdPromise);
+ LOG.debug("getContainerId result future will be completed with the result of an already ongoing invocation");
+ return containerIdPromise.future();
+ }
+ context.executeBlocking(codeHandler -> {
+ try {
+ containerId = getContainerIdViaK8sApi();
+ codeHandler.complete(containerId);
+ } catch (final Exception e) {
+ codeHandler.fail(e);
+ }
+ }, containerIdPromise);
+ containerIdPromise.future().onComplete(ar -> containerIdPromiseRef.set(null));
+ return containerIdPromise.future();
+ }
+
+ private String getContainerIdViaK8sApi() throws ServerErrorException, IllegalStateException {
+ try (KubernetesClient client = new KubernetesClientBuilder().build()) {
+ // container name env var needs to be set if there are multiple running containers in the pod
+ final String containerNameEnvVarValue = System.getenv(KUBERNETES_CONTAINER_NAME_ENV_VAR);
+ return getContainerIdViaK8sApi(client, getPodName(), containerNameEnvVarValue);
+
+ } catch (final KubernetesClientException e) {
+ if (e.getCause() != null && e.getCause().getMessage() != null
+ && e.getCause().getMessage().contains("timed out")) {
+ final String errorMsg = "Timed out getting container id via K8s API. Consider increasing " +
+ "the request timeout via the KUBERNETES_REQUEST_TIMEOUT env var (default is 10000[ms]).";
+ LOG.error(errorMsg);
+ throw new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, errorMsg);
+ }
+ throw new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR,
+ "Error getting container id via K8s API: " + e.getMessage());
+ }
+ }
+
+ static String getContainerIdViaK8sApi(final KubernetesClient client, final String podName,
+ final String containerNameEnvVarValue) throws KubernetesClientException, IllegalStateException {
+ LOG.info("get container id via K8s API");
+ try {
+ final String namespace = Optional.ofNullable(client.getNamespace()).orElse("default");
+ final Pod pod = client.pods().inNamespace(namespace).withName(podName).get();
+ if (pod == null) {
+ throw new KubernetesClientException("application pod not found in Kubernetes namespace " + namespace);
+ }
+ final List containerStatuses = PodStatusUtil.getContainerStatus(pod).stream()
+ .filter(KubernetesContainerInfoProvider::isContainerRunning).toList();
+ if (containerStatuses.isEmpty()) {
+ LOG.debug("got empty container statuses list");
+ throw new KubernetesClientException(
+ "no running container found in pod %s, namespace %s".formatted(podName, namespace));
+ }
+ final ContainerStatus foundContainerStatus;
+ if (containerStatuses.size() > 1) {
+ final String foundContainerNames = containerStatuses.stream().map(ContainerStatus::getName)
+ .collect(Collectors.joining(", "));
+ if (containerNameEnvVarValue == null) {
+ LOG.error(
+ "can't get container id: found multiple running containers, but {} env var is not set " +
+ "to specify which container to use; found containers [{}] in pod {}",
+ KUBERNETES_CONTAINER_NAME_ENV_VAR, foundContainerNames, podName);
+ throw new IllegalStateException(
+ ("can't get container id via K8s API: multiple running containers found; " +
+ "the %s env variable needs to be set for the container this application is running in, " +
+ "having the container name as value")
+ .formatted(KUBERNETES_CONTAINER_NAME_ENV_VAR));
+ }
+ LOG.info("multiple running containers found: {}", foundContainerNames);
+ LOG.info("using container name {} (derived from env var {}) to determine container id",
+ containerNameEnvVarValue, KUBERNETES_CONTAINER_NAME_ENV_VAR);
+ foundContainerStatus = containerStatuses.stream()
+ .filter(status -> status.getName().equals(containerNameEnvVarValue))
+ .findFirst()
+ .orElseThrow(() -> new KubernetesClientException(
+ "no running container with name %s found in pod %s, namespace %s"
+ .formatted(containerNameEnvVarValue, podName, namespace)));
+ } else {
+ foundContainerStatus = containerStatuses.get(0);
+ }
+ String containerId = foundContainerStatus.getContainerID();
+ // remove container runtime prefix (e.g. "containerd://")
+ final int delimIdx = containerId.lastIndexOf("://");
+ if (delimIdx > -1) {
+ containerId = containerId.substring(delimIdx + 3);
+ }
+ LOG.info("got container id via K8s API: {}", containerId);
+ return containerId;
+
+ } catch (final KubernetesClientException e) {
+ // rethrow error concerning missing RBAC role assignment as IllegalStateException to skip retry
+ // Error message looks like this:
+ // Forbidden!Configured service account doesn't have access. Service account may have been revoked. pods "XXX" is forbidden:
+ // User "XXX" cannot get resource "pods" in API group "" in the namespace "hono".
+ if (e.getMessage().contains("orbidden")) {
+ LOG.error("Error getting container id via K8s API: \n{}", e.getMessage());
+ throw new IllegalStateException("error getting container id via K8s API: " +
+ "application pod needs service account with role binding allowing 'get' on 'pods' resource");
+ }
+ LOG.error("Error getting container id via K8s API", e);
+ throw e;
+ }
+ }
+
+ private static boolean isContainerRunning(final ContainerStatus containerStatus) {
+ return containerStatus.getState() != null
+ && containerStatus.getState().getRunning() != null;
+ }
+}
diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryImpl.java b/clients/command/src/main/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryImpl.java
index c7db5077df..8a9acd6c47 100644
--- a/clients/command/src/main/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryImpl.java
+++ b/clients/command/src/main/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryImpl.java
@@ -19,6 +19,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -35,9 +37,13 @@
import io.opentracing.SpanContext;
import io.vertx.core.CompositeFuture;
+import io.vertx.core.Context;
import io.vertx.core.Future;
+import io.vertx.core.Promise;
import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
import io.vertx.ext.healthchecks.HealthCheckHandler;
+import io.vertx.ext.healthchecks.Status;
/**
* A Protocol Adapter factory for creating consumers of command messages.
@@ -51,31 +57,41 @@ public class ProtocolAdapterCommandConsumerFactoryImpl implements ProtocolAdapte
private static final AtomicInteger ADAPTER_INSTANCE_ID_COUNTER = new AtomicInteger();
- /**
- * Identifier that has to be unique to this factory instance.
- * Will be used to represent the protocol adapter (verticle) instance that this factory instance is used in,
- * when registering command handlers with the command router service client.
- */
- private final String adapterInstanceId;
+ private final Vertx vertx;
+ private final int adapterInstanceIdCounterValue;
+ private final String adapterName;
private final CommandHandlers commandHandlers = new CommandHandlers();
private final CommandRouterClient commandRouterClient;
private final List internalCommandConsumers = new ArrayList<>();
+ private final AtomicBoolean stopCalled = new AtomicBoolean();
private int maxTenantIdsPerRequest = 100;
+ private KubernetesContainerInfoProvider kubernetesContainerInfoProvider = KubernetesContainerInfoProvider.getInstance();
+ private final List> internalCommandConsumerSuppliers = new ArrayList<>();
+ private HealthCheckHandler readinessHandler;
+ /**
+ * Identifier that has to be unique to this factory instance.
+ * Will be used to represent the protocol adapter (verticle) instance that this factory instance is used in,
+ * when registering command handlers with the command router service client.
+ */
+ private String adapterInstanceId;
+ private String startFailureMessage;
/**
* Creates a new factory.
*
+ * @param vertx The Vert.x instance to use.
* @param commandRouterClient The client to use for accessing the command router service.
* @param adapterName The name of the protocol adapter.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
- public ProtocolAdapterCommandConsumerFactoryImpl(final CommandRouterClient commandRouterClient, final String adapterName) {
+ public ProtocolAdapterCommandConsumerFactoryImpl(final Vertx vertx, final CommandRouterClient commandRouterClient,
+ final String adapterName) {
+ this.vertx = Objects.requireNonNull(vertx);
this.commandRouterClient = Objects.requireNonNull(commandRouterClient);
- Objects.requireNonNull(adapterName);
+ this.adapterName = Objects.requireNonNull(adapterName);
- this.adapterInstanceId = CommandRoutingUtil.getNewAdapterInstanceId(adapterName,
- ADAPTER_INSTANCE_ID_COUNTER.getAndIncrement());
+ this.adapterInstanceIdCounterValue = ADAPTER_INSTANCE_ID_COUNTER.getAndIncrement();
if (commandRouterClient instanceof ConnectionLifecycle>) {
((ConnectionLifecycle>) commandRouterClient).addReconnectListener(con -> reenableCommandRouting());
}
@@ -85,6 +101,10 @@ void setMaxTenantIdsPerRequest(final int count) {
this.maxTenantIdsPerRequest = count;
}
+ void setKubernetesContainerInfoProvider(final KubernetesContainerInfoProvider kubernetesContainerInfoProvider) {
+ this.kubernetesContainerInfoProvider = kubernetesContainerInfoProvider;
+ }
+
private void reenableCommandRouting() {
final List tenantIds = commandHandlers.getCommandHandlers().stream()
.map(CommandHandlerWrapper::getTenantId)
@@ -115,10 +135,7 @@ private void reenableCommandRouting() {
*/
public void registerInternalCommandConsumer(
final BiFunction internalCommandConsumerSupplier) {
- final InternalCommandConsumer consumer = internalCommandConsumerSupplier.apply(adapterInstanceId,
- commandHandlers);
- LOG.info("register internal command consumer {}", consumer.getClass().getSimpleName());
- internalCommandConsumers.add(consumer);
+ this.internalCommandConsumerSuppliers.add(internalCommandConsumerSupplier);
}
/**
@@ -131,18 +148,58 @@ public void registerInternalCommandConsumer(
*/
@Override
public Future start() {
- @SuppressWarnings("rawtypes")
- final List futures = internalCommandConsumers.stream()
- .map(Lifecycle::start)
- .collect(Collectors.toList());
- if (futures.isEmpty()) {
- return Future.failedFuture("no command consumer registered");
+ if (internalCommandConsumerSuppliers.isEmpty()) {
+ startFailureMessage = "no command consumer registered";
+ LOG.error("cannot start, {}", startFailureMessage);
+ return Future.failedFuture(startFailureMessage);
}
- return CompositeFuture.all(futures).mapEmpty();
+ return getK8sContainerId(1)
+ .compose(containerId -> {
+ adapterInstanceId = CommandRoutingUtil.getNewAdapterInstanceId(adapterName, containerId,
+ adapterInstanceIdCounterValue);
+ internalCommandConsumerSuppliers.stream()
+ .map(sup -> sup.apply(adapterInstanceId, commandHandlers))
+ .forEach(consumer -> {
+ LOG.info("created internal command consumer {}", consumer.getClass().getSimpleName());
+ internalCommandConsumers.add(consumer);
+ Optional.ofNullable(readinessHandler).ifPresent(consumer::registerReadinessChecks);
+ });
+ internalCommandConsumerSuppliers.clear();
+ readinessHandler = null;
+ @SuppressWarnings("rawtypes")
+ final List futures = internalCommandConsumers.stream()
+ .map(Lifecycle::start)
+ .collect(Collectors.toList());
+ if (futures.isEmpty()) {
+ return Future.failedFuture("no command consumer registered");
+ }
+ return CompositeFuture.all(futures).mapEmpty();
+ })
+ .recover(thr -> {
+ startFailureMessage = thr.getMessage();
+ return Future.failedFuture(thr);
+ }).mapEmpty();
+ }
+
+ private Future getK8sContainerId(final int attempt) {
+ final Context context = vertx.getOrCreateContext();
+ return kubernetesContainerInfoProvider.getContainerId(context)
+ .recover(thr -> {
+ if (thr instanceof IllegalStateException || stopCalled.get()) {
+ return Future.failedFuture(thr);
+ }
+ LOG.info("attempt {} to get K8s container id failed, trying again...", attempt);
+ final Promise containerIdPromise = Promise.promise();
+ context.runOnContext(action -> getK8sContainerId(attempt + 1).onComplete(containerIdPromise));
+ return containerIdPromise.future();
+ });
}
@Override
public Future stop() {
+ if (!stopCalled.compareAndSet(false, true)) {
+ return Future.succeededFuture();
+ }
@SuppressWarnings("rawtypes")
final List futures = internalCommandConsumers.stream()
.map(Lifecycle::stop)
@@ -152,7 +209,26 @@ public Future stop() {
@Override
public void registerReadinessChecks(final HealthCheckHandler readinessHandler) {
- internalCommandConsumers.forEach(consumer -> consumer.registerReadinessChecks(readinessHandler));
+ if (!internalCommandConsumers.isEmpty()) {
+ LOG.warn("registerReadinessChecks expected to be called before start()");
+ internalCommandConsumers.forEach(consumer -> consumer.registerReadinessChecks(readinessHandler));
+ return;
+ }
+ this.readinessHandler = readinessHandler;
+ readinessHandler.register("command-consumer-factory", 1000, this::checkIfInternalCommandConsumersCreated);
+ }
+
+ private void checkIfInternalCommandConsumersCreated(final Promise status) {
+ if (internalCommandConsumers.isEmpty() || startFailureMessage != null) {
+ final JsonObject data = new JsonObject();
+ if (startFailureMessage != null) {
+ LOG.error("failed to start command consumer factory: {}", startFailureMessage);
+ data.put("status", "startup of command consumer factory failed, check logs for details");
+ }
+ status.tryComplete(Status.KO(data));
+ } else {
+ status.tryComplete(Status.OK());
+ }
}
@Override public void registerLivenessChecks(final HealthCheckHandler livenessHandler) {
@@ -201,6 +277,9 @@ private Future doCreateCommandConsumer(
final Function> commandHandler,
final Duration lifespan,
final SpanContext context) {
+ if (adapterInstanceId == null) {
+ return Future.failedFuture("not started yet");
+ }
// lifespan greater than what can be expressed in nanoseconds (i.e. 292 years) is considered unlimited,
// preventing ArithmeticExceptions down the road
final Duration sanitizedLifespan = lifespan == null || lifespan.isNegative()
@@ -247,6 +326,9 @@ private Future removeCommandConsumer(
final Instant lifespanStart,
final SpanContext onCloseSpanContext) {
+ if (adapterInstanceId == null) {
+ return Future.failedFuture("not started yet");
+ }
final String tenantId = commandHandlerWrapper.getTenantId();
final String deviceId = commandHandlerWrapper.getDeviceId();
diff --git a/clients/command/src/test/java/org/eclipse/hono/client/command/KubernetesContainerInfoProviderTest.java b/clients/command/src/test/java/org/eclipse/hono/client/command/KubernetesContainerInfoProviderTest.java
new file mode 100644
index 0000000000..d5826a7626
--- /dev/null
+++ b/clients/command/src/test/java/org/eclipse/hono/client/command/KubernetesContainerInfoProviderTest.java
@@ -0,0 +1,153 @@
+/*******************************************************************************
+ * Copyright (c) 2023 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *******************************************************************************/
+
+package org.eclipse.hono.client.command;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatus;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+
+/**
+ * Unit tests for {@link KubernetesContainerInfoProvider}.
+ */
+@EnableKubernetesMockClient(https = false)
+public class KubernetesContainerInfoProviderTest {
+
+ KubernetesMockServer server;
+ /**
+ * This client uses the namespace "test" (see KubernetesMockServer#createClient()).
+ */
+ KubernetesClient client;
+
+ /**
+ * Verifies that getting the container id via the K8s API succeeds.
+ */
+ @Test
+ public void testGetContainerIdViaK8sApi() {
+ final String podName = "testPod0";
+ final String containerId = getRandomContainerId();
+ final String containerIdWithPrefix = "containerd://" + containerId;
+ final String containerNameEnvVarValue = null; // should not be needed in this test (only one running container)
+
+ // GIVEN a Kubernetes API service mock returning a pod with a running container
+ final ContainerStatus containerStatus = createRunningContainerStatus(containerIdWithPrefix, "testContainer0");
+ final Pod pod = createPod(podName, List.of(containerStatus));
+ server.expect()
+ .withPath("/api/v1/namespaces/test/pods/" + podName)
+ .andReturn(200, pod)
+ .once();
+ // WHEN invoking getContainerIdViaK8sApi
+ final String extractedContainerId = KubernetesContainerInfoProvider.getContainerIdViaK8sApi(client, podName,
+ containerNameEnvVarValue);
+ // THEN the expected id was returned
+ assertThat(extractedContainerId).isEqualTo(containerId);
+ }
+
+ /**
+ * Verifies that getting the container id via the K8s API succeeds when there are multiple running pods.
+ */
+ @Test
+ public void testGetContainerIdViaK8sApiWithMultipleRunningContainers() {
+ final String podName = "testPod0";
+ final String containerId1WithPrefix = "containerd://" + getRandomContainerId();
+ final String containerId2 = getRandomContainerId();
+ final String containerId2WithPrefix = "containerd://" + containerId2;
+ final String containerName1 = "testContainer1";
+ final String containerName2 = "testContainer2";
+ final ContainerStatus containerStatus1 = createRunningContainerStatus(containerId1WithPrefix, containerName1);
+ final ContainerStatus containerStatus2 = createRunningContainerStatus(containerId2WithPrefix, containerName2);
+
+ // GIVEN a Kubernetes API service mock returning a pod with multiple running containers
+ final Pod pod = createPod(podName, List.of(containerStatus1, containerStatus2));
+ server.expect()
+ .withPath("/api/v1/namespaces/test/pods/" + podName)
+ .andReturn(200, pod)
+ .once();
+ // WHEN invoking getContainerIdViaK8sApi, setting "testContainer2" as the KUBERNETES_CONTAINER_NAME env var value
+ final String extractedContainerId = KubernetesContainerInfoProvider.getContainerIdViaK8sApi(client, podName,
+ containerName2);
+ // THEN the container id of "testContainer2" is returned
+ assertThat(extractedContainerId).isEqualTo(containerId2);
+ }
+
+ /**
+ * Verifies that getting the container id via the K8s API fails if there are multiple running pods, but no
+ * environment variable is set to specify the container name.
+ */
+ @Test
+ public void testGetContainerIdViaK8sApiWithMultipleRunningContainersButNoContainerNameEnvVar() {
+ final String podName = "testPod0";
+ final String containerId1 = getRandomContainerId();
+ final String containerId2 = getRandomContainerId();
+ final String containerName1 = "testContainer1";
+ final String containerName2 = "testContainer2";
+ final ContainerStatus containerStatus1 = createRunningContainerStatus(containerId1, containerName1);
+ final ContainerStatus containerStatus2 = createRunningContainerStatus(containerId2, containerName2);
+ final String containerNameEnvVarValue = null;
+
+ // GIVEN a Kubernetes API service mock returning a pod with multiple running containers
+ final Pod pod = createPod(podName, List.of(containerStatus1, containerStatus2));
+ server.expect()
+ .withPath("/api/v1/namespaces/test/pods/" + podName)
+ .andReturn(200, pod)
+ .once();
+
+ // WHEN invoking getContainerIdViaK8sApi with a null KUBERNETES_CONTAINER_NAME env var value
+ // THEN an IllegalStateException is thrown
+ final IllegalStateException thrown = assertThrows(
+ IllegalStateException.class,
+ () -> KubernetesContainerInfoProvider.getContainerIdViaK8sApi(client, podName, containerNameEnvVarValue));
+ assertThat(thrown.getMessage()).contains("multiple running containers");
+ }
+
+ private static ContainerStatus createRunningContainerStatus(final String containerIdWithPrefix,
+ final String containerName) {
+ return new ContainerStatusBuilder()
+ .withContainerID(containerIdWithPrefix)
+ .withName(containerName)
+ .withState(new ContainerStateBuilder().withNewRunning().endRunning().build())
+ .build();
+ }
+
+ private Pod createPod(final String podName, final List containerStatuses) {
+ final PodStatus podStatus = new PodStatusBuilder()
+ .withContainerStatuses(containerStatuses)
+ .build();
+ return new PodBuilder()
+ .withNewMetadata()
+ .withName(podName)
+ .endMetadata()
+ .withStatus(podStatus)
+ .build();
+ }
+
+ private static String getRandomContainerId() {
+ return UUID.randomUUID().toString().concat(UUID.randomUUID().toString()).replaceAll("-", "");
+ }
+}
diff --git a/clients/command/src/test/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryTest.java b/clients/command/src/test/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryTest.java
index 02a9785842..74a3737542 100644
--- a/clients/command/src/test/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryTest.java
+++ b/clients/command/src/test/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryTest.java
@@ -1,5 +1,5 @@
/**
- * Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation
+ * Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
@@ -28,15 +28,19 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
+import java.util.function.BiFunction;
import org.eclipse.hono.client.amqp.connection.ConnectionLifecycle;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.ReconnectListener;
+import org.eclipse.hono.test.VertxMockSupport;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import io.vertx.core.Future;
+import io.vertx.core.Vertx;
/**
* Tests verifying behavior of {@link ProtocolAdapterCommandConsumerFactoryImpl}.
@@ -45,17 +49,30 @@
public class ProtocolAdapterCommandConsumerFactoryTest {
private CommandRouterClient commandRouterClient;
+ private KubernetesContainerInfoProvider kubernetesContainerInfoProvider;
+ private BiFunction internalCommandConsumerSupplier;
private ProtocolAdapterCommandConsumerFactoryImpl factory;
/**
* Sets up the fixture.
*/
+ @SuppressWarnings("unchecked")
@BeforeEach
void setUp() {
commandRouterClient = mock(CommandRouterClient.class, withSettings().extraInterfaces(ConnectionLifecycle.class));
when(commandRouterClient.registerCommandConsumer(anyString(), anyString(), anyBoolean(), anyString(), any(Duration.class), any()))
.thenReturn(Future.succeededFuture());
- factory = new ProtocolAdapterCommandConsumerFactoryImpl(commandRouterClient, "test-adapter");
+ kubernetesContainerInfoProvider = mock(KubernetesContainerInfoProvider.class);
+ when(kubernetesContainerInfoProvider.getContainerId(any())).thenReturn(Future.succeededFuture(null));
+ final Vertx vertx = mock(Vertx.class);
+ VertxMockSupport.executeBlockingCodeImmediately(vertx);
+ factory = new ProtocolAdapterCommandConsumerFactoryImpl(vertx, commandRouterClient, "test-adapter");
+ factory.setKubernetesContainerInfoProvider(kubernetesContainerInfoProvider);
+ internalCommandConsumerSupplier = mock(BiFunction.class);
+ final InternalCommandConsumer internalCommandConsumer = mock(InternalCommandConsumer.class);
+ when(internalCommandConsumer.start()).thenReturn(Future.succeededFuture());
+ when(internalCommandConsumerSupplier.apply(anyString(), any())).thenReturn(internalCommandConsumer);
+ factory.registerInternalCommandConsumer(internalCommandConsumerSupplier);
}
@SuppressWarnings("unchecked")
@@ -67,6 +84,7 @@ void testFactoryReenablesCommandRouting() {
final ArgumentCaptor> reconnectListener = ArgumentCaptor.forClass(ReconnectListener.class);
verify(conLifecycle).addReconnectListener(reconnectListener.capture());
factory.setMaxTenantIdsPerRequest(2);
+ factory.start();
factory.createCommandConsumer("tenant1", "device1", "adapter", true, ctx -> Future.succeededFuture(), Duration.ofMinutes(10), null);
factory.createCommandConsumer("tenant2", "device2", "adapter", true, ctx -> Future.succeededFuture(), Duration.ofMinutes(10), null);
@@ -89,4 +107,50 @@ void testFactoryReenablesCommandRouting() {
}), any());
assertThat(enabledTenants).containsExactly("tenant1", "tenant2", "tenant3");
}
+
+ /**
+ * Verifies that if the command consumer factory is used within a Kubernetes container, the internal command
+ * consumer is initialized with an adapter instance id containing the container id.
+ */
+ @Test
+ void testFactoryCreatesInternalConsumersWithAdapterInstanceId() {
+ final String containerId = getTestContainerId();
+ final String shortContainerId = containerId.substring(0, 12);
+ // GIVEN a scenario where the application runs in a Kubernetes container
+ when(kubernetesContainerInfoProvider.getContainerId(any())).thenReturn(Future.succeededFuture(containerId));
+ // WHEN starting the command consumer factory
+ factory.start();
+
+ // THEN the internal command consumer has been initialized with an adapter instance id containing the container id
+ verify(internalCommandConsumerSupplier).apply(argThat(adapterInstanceId -> {
+ return adapterInstanceId.contains(shortContainerId);
+ }), any());
+ }
+
+ /**
+ * Verifies that if the command consumer factory is used within a Kubernetes container, there are multiple
+ * attempts being made to determine the container id on startup.
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ void testFactoryRetriesGetContainerId() {
+ final String containerId = getTestContainerId();
+ final String shortContainerId = containerId.substring(0, 12);
+ // GIVEN a scenario where the application runs in a Kubernetes container, but the container id can only be
+ // determined on the 3rd attempt
+ when(kubernetesContainerInfoProvider.getContainerId(any())).thenReturn(
+ Future.failedFuture("some error"),
+ Future.failedFuture("some error"),
+ Future.succeededFuture(containerId));
+ // WHEN starting the command consumer factory
+ factory.start();
+ // THEN the internal command consumer has been initialized with an adapter instance id containing the container id
+ verify(internalCommandConsumerSupplier).apply(argThat(adapterInstanceId -> {
+ return adapterInstanceId.contains(shortContainerId);
+ }), any());
+ }
+
+ private static String getTestContainerId() {
+ return UUID.randomUUID().toString().concat(UUID.randomUUID().toString()).replaceAll("-", "");
+ }
}