Skip to content

Commit

Permalink
Introduce XdsCentralDogmaBuilder (#984)
Browse files Browse the repository at this point in the history
**Motivation**

CentralDogma is used deployed in multiple regions/zones. Given that we
will soon be supporting zone-aware load balancing from Armeria's side,
we can use CentralDogma to take advantage of zone-aware load balancing.

For instance:
- we may route only reads to the CentralDogma in the same zone, whereas
write requests will be done to a single pre-designated zone.
- in DR situations, we can quickly route traffic between zones
dynamically

**Modifications**
- Introduced `XdsCentralDogmaBuilder`, which builds an
`ArmeriaCentralDogma` based on parameters
- Most methods in `AbstractArmeriaCentralDogmaBuilder` didn't make sense
for xDS, so `AbstractCentralDogmaBuilder` is extended directly
- Moved `ArmeriaCentralDogma` from the client package to the internal
package so that `XdsCentralDogmaBuilder` can also build an
`ArmeriaCentralDogma`.
- The `CentralDogma` created by `XdsCentralDogmaBuilder` creates an
`XdsBootstrap` internally that should be closed. For this reason,
`CentralDogma` now inherits `AutoCloseable` which closes underlying
resources such as `XdsBootstrap` or `EndpointGroup`.
- Misc) Modified `CentralDogmaRuleDelegate` since configuring a
`CentralDogmaExtension` with `server#port` threw an exception since the
ipv6 scope id wasn't understood.

**Result**
- `java-armeria-xds` client is introduced
  • Loading branch information
jrhee17 authored Aug 16, 2024
1 parent 69bf18f commit 96b9fc8
Show file tree
Hide file tree
Showing 21 changed files with 1,091 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,11 @@ private static Throwable convertCause(Throwable cause) {
return convertedCause;
}

@Override
public void close() {
endpointGroup.close();
}

@FunctionalInterface
private interface ThriftCall<T> {
void apply(ThriftFuture<T> callback) throws TException;
Expand Down
11 changes: 11 additions & 0 deletions client/java-armeria-xds/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
dependencies {
api project(':client:java-armeria')

// Armeria
api libs.armeria.xds

testImplementation libs.controlplane.server
testImplementation libs.controlplane.cache
testImplementation libs.armeria.junit5
testImplementation libs.jackson.dataformat.yaml
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* <a href="https://line.github.io/armeria/">Armeria</a>'s xDS-based Central Dogma client implementation.
* @see <a href="https://line.github.io/centraldogma/client-java.html" target="_blank">Java client library</a>
*/
@NonNullByDefault
package com.linecorp.centraldogma.client.armeria.xds;

import com.linecorp.centraldogma.common.util.NonNullByDefault;
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2024 LINE Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.client.armeria.xds;

import static com.linecorp.armeria.common.util.UnmodifiableFuture.completedFuture;
import static com.linecorp.centraldogma.testing.internal.auth.TestAuthMessageUtil.getAccessToken;
import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import com.linecorp.armeria.common.auth.OAuth2Token;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.auth.AuthService;
import com.linecorp.armeria.server.auth.Authorizer;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.Entry;
import com.linecorp.centraldogma.common.Query;
import com.linecorp.centraldogma.server.CentralDogmaBuilder;
import com.linecorp.centraldogma.testing.internal.auth.TestAuthMessageUtil;
import com.linecorp.centraldogma.testing.internal.auth.TestAuthProviderFactory;
import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension;

import io.envoyproxy.controlplane.cache.v3.SimpleCache;
import io.envoyproxy.controlplane.cache.v3.Snapshot;
import io.envoyproxy.controlplane.server.V3DiscoveryServer;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.listener.v3.Listener;

class AuthUpstreamTest {

private static final AtomicLong VERSION_NUMBER = new AtomicLong();
private static final String GROUP = "key";
private static final SimpleCache<String> cache = new SimpleCache<>(node -> GROUP);

@RegisterExtension
static CentralDogmaExtension dogma = new CentralDogmaExtension() {

@Override
protected void configure(CentralDogmaBuilder builder) {
builder.administrators(TestAuthMessageUtil.USERNAME);
builder.authProviderFactory(new TestAuthProviderFactory());
}

@Override
protected void scaffold(CentralDogma client) {
client.createProject("foo").join();
client.createRepository("foo", "bar")
.join()
.commit("Initial file", Change.ofJsonUpsert("/foo.json", "{ \"a\": \"bar\" }"))
.push()
.join();
}
};

private static final AtomicReference<String> accessTokenRef = new AtomicReference<>();

@RegisterExtension
static final ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
final Authorizer<OAuth2Token> tokenAuthorizer =
(ctx, token) -> completedFuture(accessTokenRef.get().equals(token.accessToken()));
sb.decorator(AuthService.builder().addOAuth2(tokenAuthorizer).newDecorator());
final V3DiscoveryServer v3DiscoveryServer = new V3DiscoveryServer(cache);
sb.service(GrpcService.builder()
.addService(v3DiscoveryServer.getAggregatedDiscoveryServiceImpl())
.addService(v3DiscoveryServer.getListenerDiscoveryServiceImpl())
.addService(v3DiscoveryServer.getClusterDiscoveryServiceImpl())
.addService(v3DiscoveryServer.getRouteDiscoveryServiceImpl())
.addService(v3DiscoveryServer.getEndpointDiscoveryServiceImpl())
.build());
}
};

@Test
void basicAuthCase() throws Exception {
final String accessToken = getAccessToken(dogma.httpClient(), TestAuthMessageUtil.USERNAME,
TestAuthMessageUtil.PASSWORD);
// so that the xds server can also verify the access token is correctly set
accessTokenRef.set(accessToken);

final Listener listener = XdsResourceReader.readResourcePath(
"/test-listener.yaml",
Listener.newBuilder(),
ImmutableMap.of("<LISTENER_NAME>", XdsCentralDogmaBuilder.DEFAULT_LISTENER_NAME,
"<CLUSTER_NAME>", "my-cluster"));
final Cluster cluster = XdsResourceReader.readResourcePath(
"/test-cluster.yaml",
Cluster.newBuilder(),
ImmutableMap.of("<NAME>", "my-cluster", "<TYPE>", "STATIC",
"<PORT>", dogma.serverAddress().getPort()));
cache.setSnapshot(
GROUP,
Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(),
ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(),
String.valueOf(VERSION_NUMBER.incrementAndGet())));

try (CentralDogma client = new XdsCentralDogmaBuilder()
.accessToken(accessToken)
.host("127.0.0.1", server.httpPort()).build()) {
final Entry<JsonNode> entry = client.forRepo("foo", "bar")
.file(Query.ofJsonPath("/foo.json"))
.get()
.get();
assertThatJson(entry.content()).node("a").isStringEqualTo("bar");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright 2024 LINE Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.client.armeria.xds;

import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.ClientRequestContextCaptor;
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.Entry;
import com.linecorp.centraldogma.common.Query;
import com.linecorp.centraldogma.server.CentralDogmaBuilder;
import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension;

import io.envoyproxy.controlplane.cache.v3.SimpleCache;
import io.envoyproxy.controlplane.cache.v3.Snapshot;
import io.envoyproxy.controlplane.server.V3DiscoveryServer;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.listener.v3.Listener;

class SimpleXdsUpstreamTest {

private static final AtomicLong VERSION_NUMBER = new AtomicLong();
private static final String GROUP = "key";
private static final SimpleCache<String> cache = new SimpleCache<>(node -> GROUP);

@RegisterExtension
static final ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
sb.port(0, SessionProtocol.HTTP);
sb.port(0, SessionProtocol.HTTP);
sb.port(0, SessionProtocol.HTTP);
final V3DiscoveryServer v3DiscoveryServer = new V3DiscoveryServer(cache);
sb.service(GrpcService.builder()
.addService(v3DiscoveryServer.getAggregatedDiscoveryServiceImpl())
.addService(v3DiscoveryServer.getListenerDiscoveryServiceImpl())
.addService(v3DiscoveryServer.getClusterDiscoveryServiceImpl())
.addService(v3DiscoveryServer.getRouteDiscoveryServiceImpl())
.addService(v3DiscoveryServer.getEndpointDiscoveryServiceImpl())
.build());
}
};

@RegisterExtension
static CentralDogmaExtension dogma = new CentralDogmaExtension() {

@Override
protected void configure(CentralDogmaBuilder builder) {
builder.port(0, SessionProtocol.HTTP);
builder.port(0, SessionProtocol.HTTP);
}

@Override
protected void scaffold(CentralDogma client) {
client.createProject("foo").join();
client.createRepository("foo", "bar")
.join()
.commit("Initial file", Change.ofJsonUpsert("/foo.json", "{ \"a\": \"bar\" }"))
.push()
.join();
}
};

@Test
void singleBootstrapSingleUpstream() throws Exception {
final Listener listener = XdsResourceReader.readResourcePath(
"/test-listener.yaml",
Listener.newBuilder(),
ImmutableMap.of("<LISTENER_NAME>", XdsCentralDogmaBuilder.DEFAULT_LISTENER_NAME,
"<CLUSTER_NAME>", "my-cluster"));
final Cluster cluster = XdsResourceReader.readResourcePath(
"/test-cluster.yaml",
Cluster.newBuilder(),
ImmutableMap.of("<NAME>", "my-cluster", "<TYPE>", "STATIC",
"<PORT>", dogma.serverAddress().getPort()));
cache.setSnapshot(
GROUP,
Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(),
ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(),
String.valueOf(VERSION_NUMBER.incrementAndGet())));

try (CentralDogma client = new XdsCentralDogmaBuilder().host("127.0.0.1", server.httpPort()).build()) {
final Entry<JsonNode> entry = client.forRepo("foo", "bar")
.file(Query.ofJsonPath("/foo.json"))
.get()
.get();
assertThatJson(entry.content()).node("a").isStringEqualTo("bar");
}
}

@Test
void multiBootstrapMultiUpstream() throws Exception {
final List<Integer> dogmaPorts = dogma.dogma().activePorts().values().stream().map(
port -> port.localAddress().getPort()).collect(Collectors.toList());
final List<Integer> serverPorts = server.server().activePorts().values().stream().map(
port -> port.localAddress().getPort()).collect(Collectors.toList());
assertThat(dogmaPorts).hasSize(3);

final Listener listener = XdsResourceReader.readResourcePath(
"/test-listener.yaml",
Listener.newBuilder(),
ImmutableMap.of("<LISTENER_NAME>", XdsCentralDogmaBuilder.DEFAULT_LISTENER_NAME,
"<CLUSTER_NAME>", "my-cluster"));
final Cluster cluster = XdsResourceReader.readResourcePath(
"/test-cluster-multiendpoint.yaml",
Cluster.newBuilder(),
ImmutableMap.of("<NAME>", "my-cluster", "<TYPE>", "STATIC",
"<PORT1>", dogmaPorts.get(0),
"<PORT2>", dogmaPorts.get(1),
"<PORT3>", dogmaPorts.get(2)));
cache.setSnapshot(
GROUP,
Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(),
ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(),
String.valueOf(VERSION_NUMBER.incrementAndGet())));

final XdsCentralDogmaBuilder builder = new XdsCentralDogmaBuilder();
for (Integer port : serverPorts) {
builder.host("127.0.0.1", port);
}
final Set<Integer> selectedPorts = new HashSet<>();
try (CentralDogma client = builder.build()) {
await().untilAsserted(() -> assertThat(client.whenEndpointReady()).isDone());
// RoundRobinStrategy guarantees that each port will be selected once
for (int i = 0; i < dogmaPorts.size(); i++) {
try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) {
final Entry<JsonNode> entry = client.forRepo("foo", "bar")
.file(Query.ofJsonPath("/foo.json"))
.get()
.get();
assertThatJson(entry.content()).node("a").isStringEqualTo("bar");
final ClientRequestContext ctx = captor.get();
selectedPorts.add(ctx.endpoint().port());
}
}
}
assertThat(selectedPorts).containsExactlyInAnyOrderElementsOf(dogmaPorts);
}

@Test
void customListenerName() throws Exception {
final Listener listener = XdsResourceReader.readResourcePath(
"/test-listener.yaml",
Listener.newBuilder(),
ImmutableMap.of("<LISTENER_NAME>", "my-listener",
"<CLUSTER_NAME>", "my-cluster"));
final Cluster cluster = XdsResourceReader.readResourcePath(
"/test-cluster.yaml",
Cluster.newBuilder(),
ImmutableMap.of("<NAME>", "my-cluster", "<TYPE>", "STATIC",
"<PORT>", dogma.serverAddress().getPort()));
cache.setSnapshot(
GROUP,
Snapshot.create(ImmutableList.of(cluster), ImmutableList.of(),
ImmutableList.of(listener), ImmutableList.of(), ImmutableList.of(),
String.valueOf(VERSION_NUMBER.incrementAndGet())));

try (CentralDogma client = new XdsCentralDogmaBuilder()
.listenerName("my-listener")
.host("127.0.0.1", server.httpPort()).build()) {
final Entry<JsonNode> entry = client.forRepo("foo", "bar")
.file(Query.ofJsonPath("/foo.json"))
.get()
.get();
assertThatJson(entry.content()).node("a").isStringEqualTo("bar");
}
}
}
Loading

0 comments on commit 96b9fc8

Please sign in to comment.