Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move KubernetesContainerUtil, adapterInstanceId methods. #3545

Merged
merged 4 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.eclipse.hono.util;
package org.eclipse.hono.client.command;

import java.io.File;
import java.io.IOException;
Expand All @@ -31,14 +31,14 @@
* This class has been copied from <em>org.apache.logging.log4j.kubernetes.ContainerUtil</em> from the
* <em>log4j-kubernetes</em> module of the <a href="https://github.com/apache/logging-log4j2">Apache Log4j 2</a>
* project (commit a50abb9). Adaptations have been done concerning the used logger and the Hono code style.
* Also a fix regarding cri-containerd container ids has been applied.
* Also, a fix regarding cri-containerd container ids has been applied.
*/
public class KubernetesContainerUtil {
public class CgroupV1KubernetesContainerUtil {
private static final int MAXLENGTH = 65;

private static final Logger LOGGER = LoggerFactory.getLogger(KubernetesContainerUtil.class);
private static final Logger LOGGER = LoggerFactory.getLogger(CgroupV1KubernetesContainerUtil.class);

private KubernetesContainerUtil() {
private CgroupV1KubernetesContainerUtil() {
}

/**
Expand All @@ -60,16 +60,16 @@ public static String getContainerId() {
final File file = new File("/proc/self/cgroup");
if (file.exists()) {
try (Stream<String> lines = Files.lines(file.toPath())) {
final String id = lines.map(KubernetesContainerUtil::getContainerId).filter(Objects::nonNull)
final String id = lines.map(CgroupV1KubernetesContainerUtil::getContainerId).filter(Objects::nonNull)
.findFirst().orElse(null);
LOGGER.debug("Found container id {}", id);
LOGGER.debug("Found container id via cgroup v1: {}", id);
return id;
}
} else {
LOGGER.warn("Unable to access container information");
LOGGER.warn("Unable to access '/proc/self/cgroup' to get container information");
}
} catch (final IOException ioe) {
LOGGER.warn("Error obtaining container id: {}", ioe.getMessage());
LOGGER.warn("Error obtaining container id via cgroup v1: {}", ioe.getMessage());
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*******************************************************************************
* Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package org.eclipse.hono.client.command;

import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.eclipse.hono.util.Pair;
import org.eclipse.hono.util.Strings;

/**
* Utility methods used in connection with routing of Command and Control messages.
*/
public class CommandRoutingUtil {

/**
* Pattern of the adapter instance identifier, used when routing a command message to a protocol
* adapter running in a Kubernetes cluster.
* <p>
* The first matcher group contains the pod name, the second matcher group contains the first 12 characters of the
* docker container id of the adapter instance.
*/
private static final Pattern KUBERNETES_ADAPTER_INSTANCE_ID_PATTERN = Pattern.compile("^(.*)_([0-9a-f]{12})_\\d+$");

private CommandRoutingUtil() {
// prevent instantiation
}

/**
* Creates a new adapter instance identifier, used for identifying the protocol adapter to route a command to.
* <p>
* If this method is invoked from within a docker container in a Kubernetes cluster, the format is
* <em>[prefix]_[docker_container_id]_[counter]</em>, with prefix being the name of the Kubernetes pod.
* See also {@link #getK8sPodNameAndContainerIdFromAdapterInstanceId(String)}.
* <p>
* If not running in a Kubernetes cluster, a random id with the given adapter name as prefix is used.
*
* @param adapterName The adapter name.
* @param counter The counter value to use.
* @return The new adapter instance identifier.
*/
public static String getNewAdapterInstanceId(final String adapterName, final int counter) {
final String k8sContainerId = CgroupV1KubernetesContainerUtil.getContainerId();
if (k8sContainerId == null || k8sContainerId.length() < 12) {
return getNewAdapterInstanceIdForNonK8sEnv(adapterName);
} else {
// running in Kubernetes: prefer HOSTNAME env var containing the pod name
String prefix = System.getenv("HOSTNAME");
if (Strings.isNullOrEmpty(prefix)) {
prefix = adapterName;
}
return getNewAdapterInstanceIdForK8sEnv(prefix, k8sContainerId, counter);
}
}

/**
* Creates a new adapter instance identifier for use in a non-Kubernetes environment.
* <p>
* The format is <em>[adapterName]_[uuid]</em>.
*
* @param adapterName The adapter name to use.
* @return The new adapter instance identifier.
*/
public static String getNewAdapterInstanceIdForNonK8sEnv(final String adapterName) {
final String prefix = Strings.isNullOrEmpty(adapterName) ? ""
: adapterName.replaceAll("[^a-zA-Z0-9._-]", "") + "-";
return prefix + UUID.randomUUID();
}

/**
* Creates a new adapter instance identifier for use in a Kubernetes environment.
* <p>
* The format is <em>[pod_name]_[docker_container_id]_[counter]</em>.
*
* @param podName The pod name to use.
* @param containerId The container identifier to use.
* @param counter The counter value to use.
* @return The new adapter instance identifier.
* @throws NullPointerException If containerId is {@code null}.
*/
public static String getNewAdapterInstanceIdForK8sEnv(final String podName, final String containerId, final int counter) {
Objects.requireNonNull(containerId);
// replace special characters so that the id can be used in a Kafka topic name
final String podNameToUse = Optional.ofNullable(podName)
.map(p -> p.replaceAll("[^a-zA-Z0-9._-]", "")).orElse("");
return String.format("%s_%s_%d",
podNameToUse,
containerId.substring(0, 12),
counter);
}

/**
* Gets the pod name and container identifier from a given adapter instance identifier that was created via
* {@link #getNewAdapterInstanceIdForK8sEnv(String, String, int)}.
*
* @param adapterInstanceId The adapter instance identifier.
* @return The pod name and container identifier pair or {@code null} if the adapter instance identifier didn't
* match.
* @throws NullPointerException If adapterInstanceId is {@code null}.
*/
public static Pair<String, String> getK8sPodNameAndContainerIdFromAdapterInstanceId(final String adapterInstanceId) {
Objects.requireNonNull(adapterInstanceId);
final Matcher matcher = KUBERNETES_ADAPTER_INSTANCE_ID_PATTERN.matcher(adapterInstanceId);
if (!matcher.matches()) {
return null;
}
return Pair.of(matcher.group(1), matcher.group(2));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -28,7 +28,6 @@
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.amqp.connection.ConnectionLifecycle;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.TenantConstants;
import org.slf4j.Logger;
Expand Down Expand Up @@ -75,7 +74,7 @@ public ProtocolAdapterCommandConsumerFactoryImpl(final CommandRouterClient comma
this.commandRouterClient = Objects.requireNonNull(commandRouterClient);
Objects.requireNonNull(adapterName);

this.adapterInstanceId = CommandConstants.getNewAdapterInstanceId(adapterName,
this.adapterInstanceId = CommandRoutingUtil.getNewAdapterInstanceId(adapterName,
ADAPTER_INSTANCE_ID_COUNTER.getAndIncrement());
if (commandRouterClient instanceof ConnectionLifecycle<?>) {
((ConnectionLifecycle<?>) commandRouterClient).addReconnectListener(con -> reenableCommandRouting());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021 Contributors to the Eclipse Foundation
* Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -11,16 +11,16 @@
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package org.eclipse.hono.util;
package org.eclipse.hono.client.command;

import static com.google.common.truth.Truth.assertThat;

import org.junit.jupiter.api.Test;

/**
* Unit tests for {@code KubernetesContainerUtil}.
* Unit tests for {@link CgroupV1KubernetesContainerUtil}.
*/
public class KubernetesContainerUtilTest {
public class CgroupV1KubernetesContainerUtilTest {

/**
* Tests extracting the container id from a container created by the containerd CRI plugin.
Expand All @@ -30,7 +30,7 @@ public void testGetContainerIdForContainerdCriContainer() {
final String containerId = "118cc69780e057ab94ed0526d2f05ad61cf208f1175bab24bba25c1d826aac82";
final String cgroupLine = "6:cpuset:/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod65b4c0a2_51e8_4d62_a015_1c096724b473.slice/"
+ "cri-containerd-118cc69780e057ab94ed0526d2f05ad61cf208f1175bab24bba25c1d826aac82.scope";
final String extractedContainerId = KubernetesContainerUtil.getContainerId(cgroupLine);
final String extractedContainerId = CgroupV1KubernetesContainerUtil.getContainerId(cgroupLine);
assertThat(extractedContainerId).isEqualTo(containerId);
}

Expand All @@ -42,7 +42,7 @@ public void testGetContainerIdForDockerContainer() {
final String containerId = "3dd988081e7149463c043b5d9c57d7309e079c5e9290f91feba1cc45a04d6a5b";
final String cgroupLine = "8:cpuset:/kubepods.slice/kubepods-pod9c26dfb6_b9c9_11e7_bfb9_02c6c1fc4861.slice/"
+ "docker-3dd988081e7149463c043b5d9c57d7309e079c5e9290f91feba1cc45a04d6a5b.scope";
final String extractedContainerId = KubernetesContainerUtil.getContainerId(cgroupLine);
final String extractedContainerId = CgroupV1KubernetesContainerUtil.getContainerId(cgroupLine);
assertThat(extractedContainerId).isEqualTo(containerId);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2022, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -11,26 +11,27 @@
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.hono.util;
package org.eclipse.hono.client.command;

import static com.google.common.truth.Truth.assertThat;

import org.eclipse.hono.util.Pair;
import org.junit.jupiter.api.Test;


/**
* Tests verifying behavior of {@link CommandConstants}.
* Tests verifying behavior of {@link CommandRoutingUtil}.
*
*/
public class CommandConstantsTest {
public class CommandRoutingUtilTest {

@Test
void testGetK8sPodNameAndContainerIdFromAdapterInstanceId() {

final String podName = "myPodName";
final String containerId = "012345678901";
final String newAdapterInstanceId = CommandConstants.getNewAdapterInstanceIdForK8sEnv(podName, containerId, 1);
final Pair<String, String> podNameAndContainerId = CommandConstants.getK8sPodNameAndContainerIdFromAdapterInstanceId(
final String newAdapterInstanceId = CommandRoutingUtil.getNewAdapterInstanceIdForK8sEnv(podName, containerId, 1);
final Pair<String, String> podNameAndContainerId = CommandRoutingUtil.getK8sPodNameAndContainerIdFromAdapterInstanceId(
newAdapterInstanceId);
assertThat(podNameAndContainerId).isNotNull();
assertThat(podNameAndContainerId.one()).isEqualTo(podName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -21,7 +21,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.CommonClientConfigs;
import org.eclipse.hono.util.KubernetesContainerUtil;
import org.eclipse.hono.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,6 +32,8 @@ public abstract class AbstractKafkaConfigProperties {

private static final AtomicInteger ID_COUNTER = new AtomicInteger();

private static final String COMPONENT_UID_DEFAULT = getK8sComponentUId();

/**
* A logger to be shared with subclasses.
*/
Expand All @@ -50,15 +51,15 @@ public abstract class AbstractKafkaConfigProperties {
* Creates a new instance.
*/
protected AbstractKafkaConfigProperties() {
this.componentUId = getComponentUIdFromEnv();
this.componentUId = COMPONENT_UID_DEFAULT;
}

private String getComponentUIdFromEnv() {
final String k8sContainerId = KubernetesContainerUtil.getContainerId();
if (k8sContainerId != null && k8sContainerId.length() >= 12) {
private static String getK8sComponentUId() {
if (System.getenv("KUBERNETES_SERVICE_HOST") != null) {
// running in Kubernetes: use HOSTNAME env var containing the pod name
final String podName = System.getenv("HOSTNAME");
return String.format("%s_%s", podName, k8sContainerId.substring(0, 12));
final String random = UUID.randomUUID().toString().replaceAll("-", "").substring(0, 12);
calohmn marked this conversation as resolved.
Show resolved Hide resolved
return String.format("%s_%s", podName, random);
}
return null;
}
Expand Down Expand Up @@ -88,7 +89,7 @@ protected final void setSpecificClientConfig(final Map<String, String> specificC
* Sets the component unique ID to be included in the client ID along with a counter value.
* <p>
* This overrides the unique ID determined automatically in case this application is running in Kubernetes
* (consisting of Pod/Container ID then).
* (containing the Pod name then).
* <p>
* Setting {@code null} here means that a UUID will be used in the client ID instead of component unique ID and
* counter value.
Expand Down Expand Up @@ -133,7 +134,7 @@ public String getBootstrapServers() {
* has been applied.
* <p>
* It is ensured that the returned map contains a unique {@code client.id}. The client ID will be created from the
* given client name, followed by a unique ID (containing component identifiers if running in Kubernetes).
* given client name, followed by a unique ID (containing a component identifier if running in Kubernetes).
* An already set {@code client.id} property value will be used as prefix for the client ID.
*
* @param clientName A name for the client to include in the added {@code client.id} property.
Expand Down
Loading