diff --git a/client/java-armeria-legacy/src/main/java/com/linecorp/centraldogma/client/armeria/legacy/LegacyCentralDogma.java b/client/java-armeria-legacy/src/main/java/com/linecorp/centraldogma/client/armeria/legacy/LegacyCentralDogma.java index be95e939bd..f34bbd072d 100644 --- a/client/java-armeria-legacy/src/main/java/com/linecorp/centraldogma/client/armeria/legacy/LegacyCentralDogma.java +++ b/client/java-armeria-legacy/src/main/java/com/linecorp/centraldogma/client/armeria/legacy/LegacyCentralDogma.java @@ -633,6 +633,11 @@ private static Throwable convertCause(Throwable cause) { return convertedCause; } + @Override + public void close() { + endpointGroup.close(); + } + @FunctionalInterface private interface ThriftCall { void apply(ThriftFuture callback) throws TException; diff --git a/client/java-armeria-xds/build.gradle b/client/java-armeria-xds/build.gradle new file mode 100644 index 0000000000..23337ec0ea --- /dev/null +++ b/client/java-armeria-xds/build.gradle @@ -0,0 +1,11 @@ +dependencies { + api project(':client:java-armeria') + + // Armeria + api libs.armeria.xds + + testImplementation libs.controlplane.server + testImplementation libs.controlplane.cache + testImplementation libs.armeria.junit5 + testImplementation libs.jackson.dataformat.yaml +} diff --git a/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/XdsCentralDogmaBuilder.java b/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/XdsCentralDogmaBuilder.java new file mode 100644 index 0000000000..edb201422d --- /dev/null +++ b/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/XdsCentralDogmaBuilder.java @@ -0,0 +1,325 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.centraldogma.client.armeria.xds; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; +import com.google.protobuf.Any; + +import com.linecorp.armeria.client.ClientBuilder; +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Clients; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.client.encoding.DecodingClient; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.CommonPools; +import com.linecorp.armeria.common.HttpHeaderNames; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.xds.XdsBootstrap; +import com.linecorp.armeria.xds.client.endpoint.XdsEndpointGroup; +import com.linecorp.centraldogma.client.AbstractCentralDogmaBuilder; +import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.client.armeria.ArmeriaClientConfigurator; +import com.linecorp.centraldogma.internal.client.ReplicationLagTolerantCentralDogma; +import com.linecorp.centraldogma.internal.client.armeria.ArmeriaCentralDogma; + +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap.DynamicResources; +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap.StaticResources; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; +import io.envoyproxy.envoy.config.core.v3.Address; +import io.envoyproxy.envoy.config.core.v3.ApiConfigSource; +import io.envoyproxy.envoy.config.core.v3.ApiConfigSource.ApiType; +import io.envoyproxy.envoy.config.core.v3.GrpcService; +import io.envoyproxy.envoy.config.core.v3.GrpcService.EnvoyGrpc; +import io.envoyproxy.envoy.config.core.v3.HeaderValue; +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.core.v3.Node; +import io.envoyproxy.envoy.config.core.v3.SocketAddress; +import io.envoyproxy.envoy.config.core.v3.TransportSocket; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; + +/** + * Builds a {@link CentralDogma} client based on an Armeria + * HTTP client. + * This client differs from {@link ArmeriaCentralDogma} in that making requests is done in two phases. + *
    + *
  1. + * An xDS request is made to xDS servers to fetch a {@link Listener} resource which represents + * how to connect to {@link CentralDogma} servers. + *
  2. + *
  3. + * Actual {@link CentralDogma} client requests are made based on the watched Listener resource. + *
  4. + *
+ * Because connection to the actual {@link CentralDogma} is determined by the watched {@link Listener}, + * {@link #hosts()} and {@link #isUseTls()} apply to connecting to the bootstrap only. + * However, because Armeria's xDS implementation isn't complete, the following parameters are applied + * to both xDS and Central Dogma requests. + * + * + *

Note that this module is considered experimental and subject to behavioral change. + */ +@UnstableApi +public final class XdsCentralDogmaBuilder extends AbstractCentralDogmaBuilder { + + private static final String BOOTSTRAP_CLUSTER_NAME = "centraldogma-bootstrap-cluster"; + @VisibleForTesting + static final String DEFAULT_LISTENER_NAME = "centraldogma-listener"; + + private ScheduledExecutorService blockingTaskExecutor = CommonPools.blockingTaskExecutor(); + private ClientFactory clientFactory = ClientFactory.ofDefault(); + private ArmeriaClientConfigurator clientConfigurator = cb -> {}; + private String listenerName = DEFAULT_LISTENER_NAME; + private Locality locality = Locality.getDefaultInstance(); + // an empty string means no local cluster + private String serviceCluster = ""; + + // TODO: @jrhee17 remove this once xDS TLS is fully supported + private Function xdsBootstrapFactory = XdsBootstrap::of; + + /** + * Sets the name of the {@link Listener} that should be requested to the xDS bootstrap servers. + * The default is {@value #DEFAULT_LISTENER_NAME}. + */ + public XdsCentralDogmaBuilder listenerName(String listenerName) { + requireNonNull(listenerName, "listenerName"); + this.listenerName = listenerName; + return this; + } + + /** + * Sets the locality of where the {@link CentralDogma} client will be running. This may be used in applying + * zone aware routing + * and is analogous to + * service-zone. + * This value will be set to {@link Node#getLocality()} in the {@link Bootstrap}. + */ + @UnstableApi + public XdsCentralDogmaBuilder serviceZone(String serviceZone) { + requireNonNull(serviceZone, "serviceZone"); + locality = Locality.newBuilder().setZone(serviceZone).build(); + return this; + } + + /** + * Sets the name of the local service cluster which this client will be located in. + * This may be used in applying + * zone aware routing + * and is analogous to + * service-cluster. + * This value will be set to {@link Node#getCluster()} in the {@link Bootstrap}. + */ + @UnstableApi + public XdsCentralDogmaBuilder serviceCluster(String serviceCluster) { + requireNonNull(serviceCluster, "serviceCluster"); + this.serviceCluster = serviceCluster; + return this; + } + + /** + * Sets the {@link ScheduledExecutorService} dedicated to the execution of blocking tasks or invocations. + * If not set, {@linkplain CommonPools#blockingTaskExecutor() the common pool} is used. + * The {@link ScheduledExecutorService} which will be used for scheduling the tasks related with + * automatic retries and invoking the callbacks for watched changes. + */ + public XdsCentralDogmaBuilder blockingTaskExecutor(ScheduledExecutorService blockingTaskExecutor) { + requireNonNull(blockingTaskExecutor, "blockingTaskExecutor"); + this.blockingTaskExecutor = blockingTaskExecutor; + return this; + } + + /** + * Sets the {@link ArmeriaClientConfigurator} that will configure an underlying + * Armeria client which performs the actual socket I/O. + * + *

Note that this doesn't affect the client making requests to the bootstrap servers. + */ + public XdsCentralDogmaBuilder clientConfigurator(ArmeriaClientConfigurator clientConfigurator) { + this.clientConfigurator = requireNonNull(clientConfigurator, "clientConfigurator"); + return this; + } + + /** + * Sets the {@link ClientFactory} that will create an underlying + * Armeria client which performs the actual socket I/O. + * + *

Note that this doesn't affect the client making requests to the bootstrap servers. + */ + public XdsCentralDogmaBuilder clientFactory(ClientFactory clientFactory) { + this.clientFactory = requireNonNull(clientFactory, "clientFactory"); + return this; + } + + @VisibleForTesting + XdsCentralDogmaBuilder xdsBoostrapFactory(Function xdsBootstrapFactory) { + this.xdsBootstrapFactory = requireNonNull(xdsBootstrapFactory, "xdsBootstrapFactory"); + return this; + } + + /** + * Returns a newly-created {@link CentralDogma} instance. + */ + public CentralDogma build() { + final XdsBootstrap xdsBootstrap = xdsBootstrap(); + final String listenerName = this.listenerName; + final EndpointGroup endpointGroup = XdsEndpointGroup.of(xdsBootstrap.listenerRoot(listenerName)); + final String scheme = "none+" + (isUseTls() ? "https" : "http"); + final ClientBuilder builder = + newClientBuilder(scheme, endpointGroup, cb -> cb.decorator(DecodingClient.newDecorator()), "/"); + final int maxRetriesOnReplicationLag = maxNumRetriesOnReplicationLag(); + + // TODO(ikhoon): Apply ExecutorServiceMetrics for the 'blockingTaskExecutor' once + // https://github.com/line/centraldogma/pull/542 is merged. + final ScheduledExecutorService blockingTaskExecutor = this.blockingTaskExecutor; + + final CentralDogma dogma = new ArmeriaCentralDogma(blockingTaskExecutor, + builder.build(WebClient.class), + accessToken(), + () -> { + endpointGroup.close(); + xdsBootstrap.close(); + }); + if (maxRetriesOnReplicationLag <= 0) { + return dogma; + } else { + return new ReplicationLagTolerantCentralDogma( + blockingTaskExecutor, dogma, maxRetriesOnReplicationLag, + retryIntervalOnReplicationLagMillis(), + () -> { + // FIXME(trustin): Note that this will always return `null` due to a known limitation + // in Armeria: https://github.com/line/armeria/issues/760 + final ClientRequestContext ctx = ClientRequestContext.currentOrNull(); + return ctx != null ? ctx.remoteAddress() : null; + }); + } + } + + private ClientBuilder newClientBuilder(String scheme, EndpointGroup endpointGroup, + Consumer customizer, String path) { + final ClientBuilder builder = Clients.builder(scheme, endpointGroup, path); + customizer.accept(builder); + clientConfigurator.configure(builder); + builder.factory(clientFactory); + return builder; + } + + private boolean isUnresolved() { + final Set hosts = hosts(); + checkState(!hosts.isEmpty(), "No hosts were added."); + final Map> addrByUnresolved = + hosts.stream().collect(Collectors.partitioningBy(InetSocketAddress::isUnresolved)); + // Until multiple clusters are supported, restrict users to either use STATIC or DNS (but not both) + checkState(addrByUnresolved.get(true).isEmpty() || + addrByUnresolved.get(false).isEmpty(), + "Cannot mix resolved and unresolved hosts (%s)", addrByUnresolved); + final InetSocketAddress firstHost = Iterables.get(hosts(), 0); + return firstHost.isUnresolved(); + } + + private XdsBootstrap xdsBootstrap() { + final GrpcService grpcService = GrpcService + .newBuilder() + .setEnvoyGrpc(EnvoyGrpc.newBuilder() + .setClusterName(BOOTSTRAP_CLUSTER_NAME)) + .addInitialMetadata(HeaderValue.newBuilder() + .setKey(HttpHeaderNames.AUTHORIZATION.toString()) + .setValue("Bearer " + accessToken())) + .build(); + final ApiConfigSource apiConfigSource = ApiConfigSource + .newBuilder() + .addGrpcServices(grpcService) + .setApiType(ApiType.AGGREGATED_GRPC) + .build(); + final DynamicResources dynamicResources = + DynamicResources.newBuilder().setAdsConfig(apiConfigSource).build(); + final Bootstrap bootstrap = + Bootstrap.newBuilder() + .setDynamicResources(dynamicResources) + .setNode(Node.newBuilder() + .setCluster(serviceCluster) + .setLocality(locality)) + .setStaticResources(StaticResources.newBuilder().addClusters(bootstrapCluster())) + .build(); + return xdsBootstrapFactory.apply(bootstrap); + } + + private Cluster bootstrapCluster() { + final boolean isUnresolved = isUnresolved(); + + final Cluster.Builder clusterBuilder = Cluster.newBuilder(); + if (isUnresolved) { + clusterBuilder.setType(DiscoveryType.STRICT_DNS); + } else { + clusterBuilder.setType(DiscoveryType.STATIC); + } + + final LocalityLbEndpoints.Builder localityLbEndpointsBuilder = LocalityLbEndpoints.newBuilder(); + for (InetSocketAddress addr : hosts()) { + final LbEndpoint lbEndpoint = fromAddress(addr); + localityLbEndpointsBuilder.addLbEndpoints(lbEndpoint); + } + final ClusterLoadAssignment clusterLoadAssignment = + ClusterLoadAssignment.newBuilder().addEndpoints(localityLbEndpointsBuilder.build()).build(); + + if (isUseTls()) { + clusterBuilder.setTransportSocket( + TransportSocket.newBuilder() + .setName("envoy.transport_sockets.tls") + .setTypedConfig(Any.pack(UpstreamTlsContext.getDefaultInstance()))); + } + + clusterBuilder.setLoadAssignment(clusterLoadAssignment) + .setName(BOOTSTRAP_CLUSTER_NAME); + return clusterBuilder.build(); + } + + private static LbEndpoint fromAddress(InetSocketAddress addr) { + final String hostString = addr.getHostString(); + final int port = addr.getPort(); + final Address address = Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder() + .setAddress(hostString) + .setPortValue(port)).build(); + return LbEndpoint.newBuilder() + .setEndpoint(Endpoint.newBuilder() + .setAddress(address)) + .build(); + } +} diff --git a/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/package-info.java b/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/package-info.java new file mode 100644 index 0000000000..ecb7f87ce6 --- /dev/null +++ b/client/java-armeria-xds/src/main/java/com/linecorp/centraldogma/client/armeria/xds/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/** + * Armeria's xDS-based Central Dogma client implementation. + * @see Java client library + */ +@NonNullByDefault +package com.linecorp.centraldogma.client.armeria.xds; + +import com.linecorp.centraldogma.common.util.NonNullByDefault; diff --git a/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/AuthUpstreamTest.java b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/AuthUpstreamTest.java new file mode 100644 index 0000000000..95b9757439 --- /dev/null +++ b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/AuthUpstreamTest.java @@ -0,0 +1,133 @@ +/* + * Copyright 2024 LINE Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.client.armeria.xds; + +import static com.linecorp.armeria.common.util.UnmodifiableFuture.completedFuture; +import static com.linecorp.centraldogma.testing.internal.auth.TestAuthMessageUtil.getAccessToken; +import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.common.auth.OAuth2Token; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.auth.AuthService; +import com.linecorp.armeria.server.auth.Authorizer; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.Entry; +import com.linecorp.centraldogma.common.Query; +import com.linecorp.centraldogma.server.CentralDogmaBuilder; +import com.linecorp.centraldogma.testing.internal.auth.TestAuthMessageUtil; +import com.linecorp.centraldogma.testing.internal.auth.TestAuthProviderFactory; +import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; + +import io.envoyproxy.controlplane.cache.v3.SimpleCache; +import io.envoyproxy.controlplane.cache.v3.Snapshot; +import io.envoyproxy.controlplane.server.V3DiscoveryServer; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.listener.v3.Listener; + +class AuthUpstreamTest { + + private static final AtomicLong VERSION_NUMBER = new AtomicLong(); + private static final String GROUP = "key"; + private static final SimpleCache cache = new SimpleCache<>(node -> GROUP); + + @RegisterExtension + static CentralDogmaExtension dogma = new CentralDogmaExtension() { + + @Override + protected void configure(CentralDogmaBuilder builder) { + builder.administrators(TestAuthMessageUtil.USERNAME); + builder.authProviderFactory(new TestAuthProviderFactory()); + } + + @Override + protected void scaffold(CentralDogma client) { + client.createProject("foo").join(); + client.createRepository("foo", "bar") + .join() + .commit("Initial file", Change.ofJsonUpsert("/foo.json", "{ \"a\": \"bar\" }")) + .push() + .join(); + } + }; + + private static final AtomicReference accessTokenRef = new AtomicReference<>(); + + @RegisterExtension + static final ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + final Authorizer tokenAuthorizer = + (ctx, token) -> completedFuture(accessTokenRef.get().equals(token.accessToken())); + sb.decorator(AuthService.builder().addOAuth2(tokenAuthorizer).newDecorator()); + final V3DiscoveryServer v3DiscoveryServer = new V3DiscoveryServer(cache); + sb.service(GrpcService.builder() + .addService(v3DiscoveryServer.getAggregatedDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getListenerDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getClusterDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getRouteDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getEndpointDiscoveryServiceImpl()) + .build()); + } + }; + + @Test + void basicAuthCase() throws Exception { + final String accessToken = getAccessToken(dogma.httpClient(), TestAuthMessageUtil.USERNAME, + TestAuthMessageUtil.PASSWORD); + // so that the xds server can also verify the access token is correctly set + accessTokenRef.set(accessToken); + + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", XdsCentralDogmaBuilder.DEFAULT_LISTENER_NAME, + "", "my-cluster")); + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "my-cluster", "", "STATIC", + "", dogma.serverAddress().getPort())); + cache.setSnapshot( + GROUP, + Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(), + ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(), + String.valueOf(VERSION_NUMBER.incrementAndGet()))); + + try (CentralDogma client = new XdsCentralDogmaBuilder() + .accessToken(accessToken) + .host("127.0.0.1", server.httpPort()).build()) { + final Entry entry = client.forRepo("foo", "bar") + .file(Query.ofJsonPath("/foo.json")) + .get() + .get(); + assertThatJson(entry.content()).node("a").isStringEqualTo("bar"); + } + } +} diff --git a/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/SimpleXdsUpstreamTest.java b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/SimpleXdsUpstreamTest.java new file mode 100644 index 0000000000..44c0c3f52b --- /dev/null +++ b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/SimpleXdsUpstreamTest.java @@ -0,0 +1,204 @@ +/* + * Copyright 2024 LINE Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.client.armeria.xds; + +import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.ClientRequestContextCaptor; +import com.linecorp.armeria.client.Clients; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.Entry; +import com.linecorp.centraldogma.common.Query; +import com.linecorp.centraldogma.server.CentralDogmaBuilder; +import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; + +import io.envoyproxy.controlplane.cache.v3.SimpleCache; +import io.envoyproxy.controlplane.cache.v3.Snapshot; +import io.envoyproxy.controlplane.server.V3DiscoveryServer; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.listener.v3.Listener; + +class SimpleXdsUpstreamTest { + + private static final AtomicLong VERSION_NUMBER = new AtomicLong(); + private static final String GROUP = "key"; + private static final SimpleCache cache = new SimpleCache<>(node -> GROUP); + + @RegisterExtension + static final ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.port(0, SessionProtocol.HTTP); + sb.port(0, SessionProtocol.HTTP); + sb.port(0, SessionProtocol.HTTP); + final V3DiscoveryServer v3DiscoveryServer = new V3DiscoveryServer(cache); + sb.service(GrpcService.builder() + .addService(v3DiscoveryServer.getAggregatedDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getListenerDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getClusterDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getRouteDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getEndpointDiscoveryServiceImpl()) + .build()); + } + }; + + @RegisterExtension + static CentralDogmaExtension dogma = new CentralDogmaExtension() { + + @Override + protected void configure(CentralDogmaBuilder builder) { + builder.port(0, SessionProtocol.HTTP); + builder.port(0, SessionProtocol.HTTP); + } + + @Override + protected void scaffold(CentralDogma client) { + client.createProject("foo").join(); + client.createRepository("foo", "bar") + .join() + .commit("Initial file", Change.ofJsonUpsert("/foo.json", "{ \"a\": \"bar\" }")) + .push() + .join(); + } + }; + + @Test + void singleBootstrapSingleUpstream() throws Exception { + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", XdsCentralDogmaBuilder.DEFAULT_LISTENER_NAME, + "", "my-cluster")); + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "my-cluster", "", "STATIC", + "", dogma.serverAddress().getPort())); + cache.setSnapshot( + GROUP, + Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(), + ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(), + String.valueOf(VERSION_NUMBER.incrementAndGet()))); + + try (CentralDogma client = new XdsCentralDogmaBuilder().host("127.0.0.1", server.httpPort()).build()) { + final Entry entry = client.forRepo("foo", "bar") + .file(Query.ofJsonPath("/foo.json")) + .get() + .get(); + assertThatJson(entry.content()).node("a").isStringEqualTo("bar"); + } + } + + @Test + void multiBootstrapMultiUpstream() throws Exception { + final List dogmaPorts = dogma.dogma().activePorts().values().stream().map( + port -> port.localAddress().getPort()).collect(Collectors.toList()); + final List serverPorts = server.server().activePorts().values().stream().map( + port -> port.localAddress().getPort()).collect(Collectors.toList()); + assertThat(dogmaPorts).hasSize(3); + + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", XdsCentralDogmaBuilder.DEFAULT_LISTENER_NAME, + "", "my-cluster")); + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster-multiendpoint.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "my-cluster", "", "STATIC", + "", dogmaPorts.get(0), + "", dogmaPorts.get(1), + "", dogmaPorts.get(2))); + cache.setSnapshot( + GROUP, + Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(), + ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(), + String.valueOf(VERSION_NUMBER.incrementAndGet()))); + + final XdsCentralDogmaBuilder builder = new XdsCentralDogmaBuilder(); + for (Integer port : serverPorts) { + builder.host("127.0.0.1", port); + } + final Set selectedPorts = new HashSet<>(); + try (CentralDogma client = builder.build()) { + await().untilAsserted(() -> assertThat(client.whenEndpointReady()).isDone()); + // RoundRobinStrategy guarantees that each port will be selected once + for (int i = 0; i < dogmaPorts.size(); i++) { + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + final Entry entry = client.forRepo("foo", "bar") + .file(Query.ofJsonPath("/foo.json")) + .get() + .get(); + assertThatJson(entry.content()).node("a").isStringEqualTo("bar"); + final ClientRequestContext ctx = captor.get(); + selectedPorts.add(ctx.endpoint().port()); + } + } + } + assertThat(selectedPorts).containsExactlyInAnyOrderElementsOf(dogmaPorts); + } + + @Test + void customListenerName() throws Exception { + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", "my-listener", + "", "my-cluster")); + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "my-cluster", "", "STATIC", + "", dogma.serverAddress().getPort())); + cache.setSnapshot( + GROUP, + Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(), + ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(), + String.valueOf(VERSION_NUMBER.incrementAndGet()))); + + try (CentralDogma client = new XdsCentralDogmaBuilder() + .listenerName("my-listener") + .host("127.0.0.1", server.httpPort()).build()) { + final Entry entry = client.forRepo("foo", "bar") + .file(Query.ofJsonPath("/foo.json")) + .get() + .get(); + assertThatJson(entry.content()).node("a").isStringEqualTo("bar"); + } + } +} diff --git a/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/TlsUpstreamTest.java b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/TlsUpstreamTest.java new file mode 100644 index 0000000000..e91de9e2ee --- /dev/null +++ b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/TlsUpstreamTest.java @@ -0,0 +1,143 @@ +/* + * Copyright 2024 LINE Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.client.armeria.xds; + +import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson; + +import java.lang.reflect.Constructor; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.grpc.GrpcClientBuilder; +import com.linecorp.armeria.common.CommonPools; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import com.linecorp.armeria.xds.XdsBootstrap; +import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.Entry; +import com.linecorp.centraldogma.common.Query; +import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; + +import io.envoyproxy.controlplane.cache.v3.SimpleCache; +import io.envoyproxy.controlplane.cache.v3.Snapshot; +import io.envoyproxy.controlplane.server.V3DiscoveryServer; +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.netty.util.concurrent.EventExecutor; + +class TlsUpstreamTest { + + private static final AtomicLong VERSION_NUMBER = new AtomicLong(); + private static final String GROUP = "key"; + private static final SimpleCache cache = new SimpleCache<>(node -> GROUP); + + @RegisterExtension + static CentralDogmaExtension dogma = new CentralDogmaExtension(true) { + + @Override + protected void scaffold(CentralDogma client) { + client.createProject("foo").join(); + client.createRepository("foo", "bar") + .join() + .commit("Initial file", Change.ofJsonUpsert("/foo.json", "{ \"a\": \"bar\" }")) + .push() + .join(); + } + }; + + @RegisterExtension + static final ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.port(0, SessionProtocol.HTTPS); + sb.tlsSelfSigned(); + final V3DiscoveryServer v3DiscoveryServer = new V3DiscoveryServer(cache); + sb.service(GrpcService.builder() + .addService(v3DiscoveryServer.getAggregatedDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getListenerDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getClusterDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getRouteDiscoveryServiceImpl()) + .addService(v3DiscoveryServer.getEndpointDiscoveryServiceImpl()) + .build()); + } + }; + + @Test + void bootstrapTlsUpstreamTls() throws Exception { + // so that the xds server can also verify the access token is correctly set + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", XdsCentralDogmaBuilder.DEFAULT_LISTENER_NAME, + "", "my-cluster")); + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "my-cluster", "", "STATIC", + "", dogma.serverAddress().getPort())); + cache.setSnapshot( + GROUP, + Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(), + ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(), + String.valueOf(VERSION_NUMBER.incrementAndGet()))); + + try (CentralDogma client = new XdsCentralDogmaBuilder() + .useTls(true) + .clientFactory(ClientFactory.insecure()) + .xdsBoostrapFactory(TlsUpstreamTest::insecureXdsBootstrap) + .host("127.0.0.1", server.httpsPort()).build()) { + final Entry entry = client.forRepo("foo", "bar") + .file(Query.ofJsonPath("/foo.json")) + .get() + .get(); + assertThatJson(entry.content()).node("a").isStringEqualTo("bar"); + } + } + + /** + * A dirty workaround to set {@link ClientFactory#insecure()} when making requests to the xDS server. + */ + private static XdsBootstrap insecureXdsBootstrap(Bootstrap bootstrap) { + try { + final Class bootstrapImplClazz = + TlsUpstreamTest.class.getClassLoader() + .loadClass("com.linecorp.armeria.xds.XdsBootstrapImpl"); + final Constructor ctor = + bootstrapImplClazz + .getDeclaredConstructor(Bootstrap.class, EventExecutor.class, Consumer.class); + ctor.setAccessible(true); + return (XdsBootstrap) ctor.newInstance(bootstrap, CommonPools.workerGroup().next(), + (Consumer) grpcClientBuilder -> { + grpcClientBuilder.factory(ClientFactory.insecure()); + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReader.java b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReader.java new file mode 100644 index 0000000000..0715fc2399 --- /dev/null +++ b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReader.java @@ -0,0 +1,71 @@ +/* + * Copyright 2024 LINE Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.client.armeria.xds; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Map.Entry; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; +import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.util.JsonFormat; +import com.google.protobuf.util.JsonFormat.Parser; +import com.google.protobuf.util.JsonFormat.TypeRegistry; + +import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; + +public final class XdsResourceReader { + + private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + private static final Parser parser = + JsonFormat.parser().usingTypeRegistry(TypeRegistry.newBuilder() + .add(HttpConnectionManager.getDescriptor()) + .add(Router.getDescriptor()) + .build()); + + private XdsResourceReader() {} + + public static T readResourcePath(String resourceName, GeneratedMessageV3.Builder builder) { + return readResourcePath(resourceName, builder, ImmutableMap.of()); + } + + @SuppressWarnings("unchecked") + public static T readResourcePath(String resourceName, GeneratedMessageV3.Builder builder, + Map variablesMap) { + final URL resource = XdsResourceReader.class.getResource(resourceName); + checkNotNull(resource, "Couldn't find resource (%s)", resourceName); + try { + String resourceStr = Resources.toString(resource, StandardCharsets.UTF_8); + for (Entry entry : variablesMap.entrySet()) { + resourceStr = resourceStr.replaceAll(entry.getKey(), entry.getValue().toString()); + } + final JsonNode jsonNode = mapper.reader().readTree(resourceStr); + parser.merge(jsonNode.toString(), builder); + } catch (Exception e) { + throw new RuntimeException(e); + } + return (T) builder.build(); + } +} diff --git a/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReaderTest.java b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReaderTest.java new file mode 100644 index 0000000000..e8b2ffe87e --- /dev/null +++ b/client/java-armeria-xds/src/test/java/com/linecorp/centraldogma/client/armeria/xds/XdsResourceReaderTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024 LINE Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.centraldogma.client.armeria.xds; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +import com.google.common.collect.ImmutableMap; + +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; + +class XdsResourceReaderTest { + + @Test + void basicCase() throws Exception { + final Listener listener = XdsResourceReader.readResourcePath( + "/test-listener.yaml", + Listener.newBuilder(), + ImmutableMap.of("", "listener_0", "", "my-cluster")); + assertThat(listener.getName()).isEqualTo("listener_0"); + final HttpConnectionManager manager = listener.getApiListener().getApiListener() + .unpack(HttpConnectionManager.class); + assertThat(manager.getRouteConfig().getName()).isEqualTo("local_route"); + assertThat(manager.getRouteConfig().getVirtualHosts(0).getRoutes(0) + .getRoute().getCluster()).isEqualTo("my-cluster"); + } + + @Test + void clusterReplacements() throws Exception { + final Cluster cluster = XdsResourceReader.readResourcePath( + "/test-cluster.yaml", + Cluster.newBuilder(), + ImmutableMap.of("", "test-cluster", "", "EDS", "", "8080")); + assertThat(cluster.getName()).isEqualTo("test-cluster"); + assertThat(cluster.getType()).isEqualTo(DiscoveryType.EDS); + final ClusterLoadAssignment loadAssignment = cluster.getLoadAssignment(); + assertThat(loadAssignment.getClusterName()).isEqualTo("test-cluster"); + assertThat(loadAssignment.getEndpoints(0).getLbEndpoints(0).getEndpoint().getAddress() + .getSocketAddress().getPortValue()) + .isEqualTo(8080); + } +} diff --git a/client/java-armeria-xds/src/test/resources/test-cluster-multiendpoint.yaml b/client/java-armeria-xds/src/test/resources/test-cluster-multiendpoint.yaml new file mode 100644 index 0000000000..fd2f359ed0 --- /dev/null +++ b/client/java-armeria-xds/src/test/resources/test-cluster-multiendpoint.yaml @@ -0,0 +1,23 @@ +name: +connect_timeout: 0.25s +type: +lb_policy: ROUND_ROBIN +load_assignment: + cluster_name: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: diff --git a/client/java-armeria-xds/src/test/resources/test-cluster.yaml b/client/java-armeria-xds/src/test/resources/test-cluster.yaml new file mode 100644 index 0000000000..79e029f31e --- /dev/null +++ b/client/java-armeria-xds/src/test/resources/test-cluster.yaml @@ -0,0 +1,13 @@ +name: +connect_timeout: 0.25s +type: +lb_policy: ROUND_ROBIN +load_assignment: + cluster_name: + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: diff --git a/client/java-armeria-xds/src/test/resources/test-listener.yaml b/client/java-armeria-xds/src/test/resources/test-listener.yaml new file mode 100644 index 0000000000..95491f1dda --- /dev/null +++ b/client/java-armeria-xds/src/test/resources/test-listener.yaml @@ -0,0 +1,22 @@ +name: +address: + socket_address: + address: 0.0.0.0 + port_value: 8080 +api_listener: + api_listener: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + http_protocol_options: + enable_trailers: true + codec_type: AUTO + route_config: + name: local_route + virtual_hosts: + - name: local_service + domains: [ "*" ] + routes: + - match: + prefix: "/" + route: + cluster: diff --git a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogmaBuilder.java b/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogmaBuilder.java index 89bb8b172e..d1c2dba167 100644 --- a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogmaBuilder.java +++ b/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogmaBuilder.java @@ -25,6 +25,7 @@ import com.linecorp.armeria.client.endpoint.EndpointGroup; import com.linecorp.centraldogma.client.CentralDogma; import com.linecorp.centraldogma.internal.client.ReplicationLagTolerantCentralDogma; +import com.linecorp.centraldogma.internal.client.armeria.ArmeriaCentralDogma; /** * Builds a {@link CentralDogma} client based on an Armeria @@ -50,7 +51,8 @@ public CentralDogma build() throws UnknownHostException { final CentralDogma dogma = new ArmeriaCentralDogma(blockingTaskExecutor, builder.build(WebClient.class), - accessToken()); + accessToken(), + endpointGroup::close); if (maxRetriesOnReplicationLag <= 0) { return dogma; } else { diff --git a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/DnsAddressEndpointGroupConfigurator.java b/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/DnsAddressEndpointGroupConfigurator.java index 5cbbe7edc5..ad5e28cd00 100644 --- a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/DnsAddressEndpointGroupConfigurator.java +++ b/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/DnsAddressEndpointGroupConfigurator.java @@ -18,6 +18,7 @@ import com.linecorp.armeria.client.endpoint.dns.DnsAddressEndpointGroupBuilder; import com.linecorp.centraldogma.client.CentralDogma; +import com.linecorp.centraldogma.internal.client.armeria.ArmeriaCentralDogma; /** * Configures the DNS resolution of the Armeria client of diff --git a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogma.java b/client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/ArmeriaCentralDogma.java similarity index 98% rename from client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogma.java rename to client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/ArmeriaCentralDogma.java index 31764fb1a6..72ebd1f99c 100644 --- a/client/java-armeria/src/main/java/com/linecorp/centraldogma/client/armeria/ArmeriaCentralDogma.java +++ b/client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/ArmeriaCentralDogma.java @@ -1,7 +1,7 @@ /* - * Copyright 2019 LINE Corporation + * Copyright 2024 LINE Corporation * - * LINE Corporation licenses this file to you under the Apache License, + * LY Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * @@ -14,7 +14,7 @@ * under the License. */ -package com.linecorp.centraldogma.client.armeria; +package com.linecorp.centraldogma.internal.client.armeria; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; @@ -108,7 +108,7 @@ import com.linecorp.centraldogma.internal.Util; import com.linecorp.centraldogma.internal.api.v1.WatchTimeout; -final class ArmeriaCentralDogma extends AbstractCentralDogma { +public final class ArmeriaCentralDogma extends AbstractCentralDogma { private static final MediaType JSON_PATCH_UTF8 = MediaType.JSON_PATCH.withCharset(StandardCharsets.UTF_8); @@ -137,11 +137,14 @@ final class ArmeriaCentralDogma extends AbstractCentralDogma { private final WebClient client; private final String authorization; + private final SafeCloseable safeCloseable; - ArmeriaCentralDogma(ScheduledExecutorService blockingTaskExecutor, WebClient client, String accessToken) { + public ArmeriaCentralDogma(ScheduledExecutorService blockingTaskExecutor, + WebClient client, String accessToken, SafeCloseable safeCloseable) { super(blockingTaskExecutor); this.client = requireNonNull(client, "client"); authorization = "Bearer " + requireNonNull(accessToken, "accessToken"); + this.safeCloseable = safeCloseable; } @Override @@ -1136,4 +1139,9 @@ private static T handleErrorResponse(AggregatedHttpResponse res) { throw new CentralDogmaException("unexpected response: " + res.headers() + ", " + res.contentUtf8()); } + + @Override + public void close() { + safeCloseable.close(); + } } diff --git a/client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/package-info.java b/client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/package-info.java new file mode 100644 index 0000000000..a2f255756a --- /dev/null +++ b/client/java-armeria/src/main/java/com/linecorp/centraldogma/internal/client/armeria/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/** + * Armeria-based Central Dogma client internal implementation. + * + */ +@NonNullByDefault +package com.linecorp.centraldogma.internal.client.armeria; + +import com.linecorp.centraldogma.common.util.NonNullByDefault; diff --git a/client/java/src/main/java/com/linecorp/centraldogma/client/CentralDogma.java b/client/java/src/main/java/com/linecorp/centraldogma/client/CentralDogma.java index 5b94c531f5..ab5c729406 100644 --- a/client/java/src/main/java/com/linecorp/centraldogma/client/CentralDogma.java +++ b/client/java/src/main/java/com/linecorp/centraldogma/client/CentralDogma.java @@ -48,7 +48,7 @@ /** * Central Dogma client. */ -public interface CentralDogma { +public interface CentralDogma extends AutoCloseable { /** * Returns a new {@link CentralDogmaRepository} that is used to send a request to the specified @@ -850,4 +850,11 @@ Watcher repositoryWatcher(String projectName, String repositoryName, Stri * without additional delay. */ CompletableFuture whenEndpointReady(); + + /** + * Closes underlying resources that may be used when making requests to the server such as + * health checking or dns queries. + */ + @Override + void close() throws Exception; } diff --git a/client/java/src/main/java/com/linecorp/centraldogma/internal/client/ReplicationLagTolerantCentralDogma.java b/client/java/src/main/java/com/linecorp/centraldogma/internal/client/ReplicationLagTolerantCentralDogma.java index 1114ff385c..0f91343724 100644 --- a/client/java/src/main/java/com/linecorp/centraldogma/internal/client/ReplicationLagTolerantCentralDogma.java +++ b/client/java/src/main/java/com/linecorp/centraldogma/internal/client/ReplicationLagTolerantCentralDogma.java @@ -797,6 +797,11 @@ private static Object resultOrCause(@Nullable Object res, @Nullable Throwable ca return res != null ? res : cause; } + @Override + public void close() throws Exception { + delegate.close(); + } + private static final class RepoId { private final String projectName; private final String repositoryName; diff --git a/dependencies.toml b/dependencies.toml index 47601bd4a3..5c935fb416 100644 --- a/dependencies.toml +++ b/dependencies.toml @@ -203,6 +203,8 @@ javadocs = "https://fasterxml.github.io/jackson-core/javadoc/2.13/" [libraries.jackson-databind] module = "com.fasterxml.jackson.core:jackson-databind" javadocs = "https://fasterxml.github.io/jackson-databind/javadoc/2.13/" +[libraries.jackson-dataformat-yaml] +module = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml" [libraries.jackson-datatype-jsr310] module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" # Only used for testing. See JacksonRequestConverterFunctionTest for more information. diff --git a/settings.gradle b/settings.gradle index 3d90e116f5..30f63381bf 100644 --- a/settings.gradle +++ b/settings.gradle @@ -9,6 +9,7 @@ includeWithFlags ':bom', 'bom' includeWithFlags ':client:java', 'java', 'publish', 'relocate' includeWithFlags ':client:java-armeria', 'java', 'publish', 'relocate' includeWithFlags ':client:java-armeria-legacy', 'java', 'publish', 'relocate' +includeWithFlags ':client:java-armeria-xds', 'java', 'publish', 'relocate' includeWithFlags ':client:java-spring-boot2-autoconfigure', 'java', 'publish', 'relocate', 'no_aggregation' includeWithFlags ':client:java-spring-boot2-starter', 'java', 'publish', 'relocate', 'no_aggregation' includeWithFlags ':client:java-spring-boot3-autoconfigure', 'java17', 'publish', 'relocate' diff --git a/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java b/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java index 36fd7dcc34..4d0e9d2269 100644 --- a/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java +++ b/testing/common/src/main/java/com/linecorp/centraldogma/testing/internal/CentralDogmaRuleDelegate.java @@ -158,7 +158,7 @@ public final CompletableFuture startAsync(File dataDir) { throw new IOError(e); } - final String uri = "h2c://" + serverAddress.getHostString() + ':' + serverAddress.getPort(); + final String uri = "h2c://127.0.0.1:" + serverAddress.getPort(); final WebClientBuilder webClientBuilder = WebClient.builder(uri); if (accessToken != null) { webClientBuilder.auth(AuthToken.ofOAuth2(accessToken));