From 4fb36262d2099279dfe49940986ad9da6ddecd1e Mon Sep 17 00:00:00 2001 From: AgraVator Date: Mon, 14 Apr 2025 23:05:29 +0530 Subject: [PATCH 1/9] feat: add the missing xds.authority label --- .../grpc/xds/XdsClientMetricReporterImpl.java | 36 ++++++++++++------- .../java/io/grpc/xds/client/XdsClient.java | 4 ++- .../io/grpc/xds/client/XdsClientImpl.java | 9 +++++ .../xds/XdsClientMetricReporterImplTest.java | 4 +-- 4 files changed, 37 insertions(+), 16 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java index 0b592eb019e..915a81d0d41 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java @@ -17,6 +17,7 @@ package io.grpc.xds; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.LongCounterMetricInstrument; import io.grpc.LongGaugeMetricInstrument; @@ -29,12 +30,11 @@ import io.grpc.xds.client.XdsClient.ResourceMetadata; import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus; import io.grpc.xds.client.XdsClient.ServerConnectionCallback; +import io.grpc.xds.client.XdsClientImpl; import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsResourceType; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; + +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -90,7 +90,7 @@ final class XdsClientMetricReporterImpl implements XdsClientMetricReporter { Arrays.asList("grpc.target", "grpc.xds.server"), Collections.emptyList(), false); RESOURCES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.xds_client.resources", "EXPERIMENTAL. Number of xDS resources.", "{resource}", - Arrays.asList("grpc.target", "grpc.xds.cache_state", + Arrays.asList("grpc.target", "grpc.xds.authority", "grpc.xds.cache_state", "grpc.xds.resource_type"), Collections.emptyList(), false); } @@ -143,7 +143,7 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) { Map, Map> metadataByType = getResourceMetadataCompleted.get(10, TimeUnit.SECONDS); - computeAndReportResourceCounts(metadataByType, callback); + computeAndReportResourceCounts(xdsClient, metadataByType, callback); // Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS); @@ -155,21 +155,30 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) { } } - private void computeAndReportResourceCounts( - Map, Map> metadataByType, - MetricReporterCallback callback) { + private void computeAndReportResourceCounts(XdsClient xdsClient, + Map, Map> metadataByType, + MetricReporterCallback callback) { for (Map.Entry, Map> metadataByTypeEntry : metadataByType.entrySet()) { XdsResourceType type = metadataByTypeEntry.getKey(); Map resourceCountsByState = new HashMap<>(); + List authorities = new ArrayList<>(); for (ResourceMetadata metadata : metadataByTypeEntry.getValue().values()) { String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached()); resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1); } + for (String resourceName : metadataByTypeEntry.getValue().keySet()) { + authorities.add(xdsClient.getAuthority(type, resourceName)); + } - resourceCountsByState.forEach((cacheState, count) -> - callback.reportResourceCountGauge(count, cacheState, type.typeUrl())); + Iterator authorityIterator = authorities.iterator(); + resourceCountsByState.forEach((cacheState, count) -> { + if (authorityIterator.hasNext()) { + String authority = authorityIterator.next(); + callback.reportResourceCountGauge(authority, count, cacheState, type.typeUrl()); + } + }); } } @@ -200,10 +209,11 @@ static final class MetricReporterCallback implements ServerConnectionCallback { } // TODO(dnvindhya): include the "authority" label once xds.authority is available. - void reportResourceCountGauge(long resourceCount, String cacheState, + void reportResourceCountGauge(String authority, long resourceCount, String cacheState, String resourceType) { recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount, - Arrays.asList(target, cacheState, resourceType), Collections.emptyList()); + Arrays.asList(target, authority == null ? "#old" : authority, + cacheState, resourceType), Collections.emptyList()); } @Override diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClient.java b/xds/src/main/java/io/grpc/xds/client/XdsClient.java index 1b53f6778c7..e36be6576e1 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClient.java @@ -318,7 +318,9 @@ public Object getSecurityConfig() { getSubscribedResourcesMetadataSnapshot() { throw new UnsupportedOperationException(); } - + public String getAuthority(XdsResourceType resourceType, String resourceName) { + throw new UnsupportedOperationException(); + } /** * Registers a data watcher for the given Xds resource. */ diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 034779ed023..f8c0d3446f5 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -242,6 +242,15 @@ public void run() { return future; } + @Override + public String getAuthority(XdsResourceType resourceType, String resourceName) { + Map> resourceEntry = resourceSubscribers.get(resourceType); + if (resourceEntry != null) { + return resourceEntry.get(resourceName).authority; + } + return null; + } + @Override public Object getSecurityConfig() { return securityConfig; diff --git a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java index df5ab87a1c0..be8a2aa71cf 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java @@ -199,7 +199,7 @@ public void metricGauges() { // Verify that reportResourceCounts and reportServerConnections were called // with the captured callback - callback.reportResourceCountGauge(10, "acked", resourceTypeUrl); + callback.reportResourceCountGauge("PotatoHead", 10, "acked", resourceTypeUrl); inOrder.verify(mockBatchRecorder) .recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), any(), any()); @@ -222,7 +222,7 @@ public void metricReporterCallback() { eq(Lists.newArrayList())); String cacheState = "requested"; - callback.reportResourceCountGauge(10, cacheState, resourceTypeUrl); + callback.reportResourceCountGauge("BuzzLightyear", 10, cacheState, resourceTypeUrl); verify(mockBatchRecorder, times(1)).recordLongGauge( eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), eq(Arrays.asList(target, cacheState, resourceTypeUrl)), From ce6266c96d41901a5d0aca0db9f15c8bdb25ec1d Mon Sep 17 00:00:00 2001 From: AgraVator Date: Tue, 15 Apr 2025 16:08:19 +0530 Subject: [PATCH 2/9] 1. xds: get the authority map from the XdsClient along with other data as in the existing flow 2. xds: modifies existing tests to account for authority --- .../grpc/xds/XdsClientMetricReporterImpl.java | 43 +++++++------- .../java/io/grpc/xds/client/XdsClient.java | 13 ++++- .../io/grpc/xds/client/XdsClientImpl.java | 30 ++++++++-- .../xds/XdsClientMetricReporterImplTest.java | 56 ++++++++++++++++--- 4 files changed, 107 insertions(+), 35 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java index 915a81d0d41..86de02e9159 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java @@ -17,7 +17,6 @@ package io.grpc.xds; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.LongCounterMetricInstrument; import io.grpc.LongGaugeMetricInstrument; @@ -30,11 +29,12 @@ import io.grpc.xds.client.XdsClient.ResourceMetadata; import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus; import io.grpc.xds.client.XdsClient.ServerConnectionCallback; -import io.grpc.xds.client.XdsClientImpl; import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsResourceType; - -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -143,7 +143,13 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) { Map, Map> metadataByType = getResourceMetadataCompleted.get(10, TimeUnit.SECONDS); - computeAndReportResourceCounts(xdsClient, metadataByType, callback); + ListenableFuture, Map>> + getResourceAuthorityCompleted = xdsClient.getSubscribedResourcesAuthoritySnapshot(); + + Map, Map> authorityByType = + getResourceAuthorityCompleted.get(10, TimeUnit.SECONDS); + + computeAndReportResourceCounts(metadataByType, authorityByType, callback); // Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS); @@ -155,30 +161,29 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) { } } - private void computeAndReportResourceCounts(XdsClient xdsClient, - Map, Map> metadataByType, - MetricReporterCallback callback) { + private void computeAndReportResourceCounts( + Map, Map> metadataByType, + Map, Map> authorityByType, + MetricReporterCallback callback) { for (Map.Entry, Map> metadataByTypeEntry : metadataByType.entrySet()) { XdsResourceType type = metadataByTypeEntry.getKey(); Map resourceCountsByState = new HashMap<>(); - List authorities = new ArrayList<>(); - for (ResourceMetadata metadata : metadataByTypeEntry.getValue().values()) { + Map authorityByState = new HashMap<>(); + for (Map.Entry metadataByName : + metadataByTypeEntry.getValue().entrySet()) { + String resourceName = metadataByName.getKey(); + ResourceMetadata metadata = metadataByName.getValue(); String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached()); resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1); - } - for (String resourceName : metadataByTypeEntry.getValue().keySet()) { - authorities.add(xdsClient.getAuthority(type, resourceName)); + authorityByState.put(cacheState, authorityByType.get(type).get(resourceName)); } - Iterator authorityIterator = authorities.iterator(); resourceCountsByState.forEach((cacheState, count) -> { - if (authorityIterator.hasNext()) { - String authority = authorityIterator.next(); - callback.reportResourceCountGauge(authority, count, cacheState, type.typeUrl()); - } - }); + callback.reportResourceCountGauge(authorityByState.get(cacheState), + count, cacheState, type.typeUrl()); + }); } } diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClient.java b/xds/src/main/java/io/grpc/xds/client/XdsClient.java index e36be6576e1..07d2f3ef82b 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClient.java @@ -318,9 +318,20 @@ public Object getSecurityConfig() { getSubscribedResourcesMetadataSnapshot() { throw new UnsupportedOperationException(); } - public String getAuthority(XdsResourceType resourceType, String resourceName) { + + /** + * Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as + * they are at the moment of the call. + * + *

The snapshot is a map from the "resource type" to + * a map ("resource name": "authority"). + */ + // Must be synchronized. + public ListenableFuture, Map>> + getSubscribedResourcesAuthoritySnapshot() { throw new UnsupportedOperationException(); } + /** * Registers a data watcher for the given Xds resource. */ diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index f8c0d3446f5..5c50354ed1e 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -242,13 +242,31 @@ public void run() { return future; } + // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic. + // ResourceTypes that do not have subscribers does not show up in the snapshot keys. @Override - public String getAuthority(XdsResourceType resourceType, String resourceName) { - Map> resourceEntry = resourceSubscribers.get(resourceType); - if (resourceEntry != null) { - return resourceEntry.get(resourceName).authority; - } - return null; + public ListenableFuture, Map>> + getSubscribedResourcesAuthoritySnapshot() { + final SettableFuture, Map>> future = + SettableFuture.create(); + syncContext.execute(new Runnable() { + @Override + public void run() { + // A map from a "resource type" to a map ("resource name": "authority") + ImmutableMap.Builder, Map> authoritySnapshot = + ImmutableMap.builder(); + for (XdsResourceType resourceType : resourceSubscribers.keySet()) { + ImmutableMap.Builder authorityMap = ImmutableMap.builder(); + for (Map.Entry> resourceEntry + : resourceSubscribers.get(resourceType).entrySet()) { + authorityMap.put(resourceEntry.getKey(), resourceEntry.getValue().authority); + } + authoritySnapshot.put(resourceType, authorityMap.buildOrThrow()); + } + future.set(authoritySnapshot.buildOrThrow()); + } + }); + return future; } @Override diff --git a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java index be8a2aa71cf..8e18bc50f14 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java @@ -76,6 +76,7 @@ public class XdsClientMetricReporterImplTest { private static final String target = "test-target"; + private static final String authority = "test-authority"; private static final String server = "trafficdirector.googleapis.com"; private static final String resourceTypeUrl = "resourceTypeUrl.googleapis.com/envoy.config.cluster.v3.Cluster"; @@ -101,7 +102,6 @@ public void setUp() { @Test public void reportResourceUpdates() { - // TODO(dnvindhya): add the "authority" label once available. reporter.reportResourceUpdates(10, 5, server, resourceTypeUrl); verify(mockMetricRecorder).addLongCounter( eqMetricInstrumentName("grpc.xds_client.resource_updates_valid"), eq((long) 10), @@ -129,6 +129,8 @@ public void setXdsClient_reportMetrics() throws Exception { future.set(null); when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture( ImmutableMap.of())); + when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot()) + .thenReturn(Futures.immediateFuture(ImmutableMap.of())); when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class))) .thenReturn(future); reporter.setXdsClient(mockXdsClient); @@ -150,6 +152,8 @@ public void setXdsClient_reportCallbackMetrics_resourceCountsFails() { future.set(null); when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture( ImmutableMap.of())); + when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot()) + .thenReturn(Futures.immediateFuture(ImmutableMap.of())); // Create a future that will throw an exception SettableFuture serverConnectionsFeature = SettableFuture.create(); @@ -177,6 +181,8 @@ public void metricGauges() { future.set(null); when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture( ImmutableMap.of())); + when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot()) + .thenReturn(Futures.immediateFuture(ImmutableMap.of())); when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class))) .thenReturn(future); reporter.setXdsClient(mockXdsClient); @@ -222,16 +228,17 @@ public void metricReporterCallback() { eq(Lists.newArrayList())); String cacheState = "requested"; - callback.reportResourceCountGauge("BuzzLightyear", 10, cacheState, resourceTypeUrl); + callback.reportResourceCountGauge(authority, 10, cacheState, resourceTypeUrl); verify(mockBatchRecorder, times(1)).recordLongGauge( eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), - eq(Arrays.asList(target, cacheState, resourceTypeUrl)), + eq(Arrays.asList(target, authority, cacheState, resourceTypeUrl)), eq(Collections.emptyList())); } @Test public void reportCallbackMetrics_computeAndReportResourceCounts() { Map, Map> metadataByType = new HashMap<>(); + Map, Map> authorityByType = new HashMap<>(); XdsResourceType listenerResource = XdsListenerResource.getInstance(); XdsResourceType routeConfigResource = XdsRouteConfigureResource.getInstance(); XdsResourceType clusterResource = XdsClusterResource.getInstance(); @@ -241,31 +248,44 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { long nanosLastUpdate = 1577923199_606042047L; Map ldsResourceMetadataMap = new HashMap<>(); + Map ldsAuthorityMap = new HashMap<>(); ldsResourceMetadataMap.put("resource1", ResourceMetadata.newResourceMetadataRequested()); + ldsAuthorityMap.put("resource1", "authority1"); ResourceMetadata ackedLdsResource = ResourceMetadata.newResourceMetadataAcked(rawListener, "42", nanosLastUpdate); ldsResourceMetadataMap.put("resource2", ackedLdsResource); + ldsAuthorityMap.put("resource2", "authority2"); ldsResourceMetadataMap.put("resource3", ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate)); + ldsAuthorityMap.put("resource3", "authority3"); ldsResourceMetadataMap.put("resource4", ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", nanosLastUpdate, "nacked after previous ack", true)); + ldsAuthorityMap.put("resource4", "authority4"); Map rdsResourceMetadataMap = new HashMap<>(); + Map rdsAuthorityMap = new HashMap<>(); ResourceMetadata requestedRdsResourceMetadata = ResourceMetadata.newResourceMetadataRequested(); rdsResourceMetadataMap.put("resource5", ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24", nanosLastUpdate, "nacked after request", false)); + rdsAuthorityMap.put("resource5", "authority5"); rdsResourceMetadataMap.put("resource6", ResourceMetadata.newResourceMetadataDoesNotExist()); + rdsAuthorityMap.put("resource6", "authority6"); Map cdsResourceMetadataMap = new HashMap<>(); + Map cdsAuthorityMap = new HashMap<>(); cdsResourceMetadataMap.put("resource7", ResourceMetadata.newResourceMetadataUnknown()); + cdsAuthorityMap.put("resource7", "authority7"); metadataByType.put(listenerResource, ldsResourceMetadataMap); + authorityByType.put(listenerResource, ldsAuthorityMap); metadataByType.put(routeConfigResource, rdsResourceMetadataMap); + authorityByType.put(routeConfigResource, rdsAuthorityMap); metadataByType.put(clusterResource, cdsResourceMetadataMap); + authorityByType.put(clusterResource, cdsAuthorityMap); SettableFuture reportServerConnectionsCompleted = SettableFuture.create(); reportServerConnectionsCompleted.set(null); @@ -277,36 +297,49 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()) .thenReturn(getResourceMetadataCompleted); + ListenableFuture, Map>> + getResourceAuthorityCompleted = Futures.immediateFuture(authorityByType); + when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot()) + .thenReturn(getResourceAuthorityCompleted); + reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient); // LDS resource requested verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "requested", listenerResource.typeUrl())), any()); + eq(1L), eq(Arrays.asList(target, "authority1", + "requested", listenerResource.typeUrl())), any()); // LDS resources acked verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(2L), eq(Arrays.asList(target, "acked", listenerResource.typeUrl())), any()); + eq(2L), eq(Arrays.asList(target, "authority3", + "acked", listenerResource.typeUrl())), any()); // LDS resource nacked but cached verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "nacked_but_cached", listenerResource.typeUrl())), any()); + eq(1L), eq(Arrays.asList(target, "authority4", + "nacked_but_cached", listenerResource.typeUrl())), any()); // RDS resource nacked verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "nacked", routeConfigResource.typeUrl())), any()); + eq(1L), eq(Arrays.asList(target, "authority5", + "nacked", routeConfigResource.typeUrl())), any()); // RDS resource does not exist verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "does_not_exist", routeConfigResource.typeUrl())), any()); + eq(1L), eq(Arrays.asList(target, "authority6", + "does_not_exist", routeConfigResource.typeUrl())), any()); // CDS resource unknown verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "unknown", clusterResource.typeUrl())), any()); + eq(1L), eq(Arrays.asList(target, "authority7", + "unknown", clusterResource.typeUrl())), any()); verifyNoMoreInteractions(mockBatchRecorder); } @Test public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources() { Map, Map> metadataByType = new HashMap<>(); + Map, Map> authorityByType = new HashMap<>(); XdsResourceType listenerResource = XdsListenerResource.getInstance(); metadataByType.put(listenerResource, Collections.emptyMap()); + authorityByType.put(listenerResource, Collections.emptyMap()); SettableFuture reportServerConnectionsCompleted = SettableFuture.create(); reportServerConnectionsCompleted.set(null); @@ -318,6 +351,11 @@ public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources( when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()) .thenReturn(getResourceMetadataCompleted); + ListenableFuture, Map>> + getAuthorityCompleted = Futures.immediateFuture(authorityByType); + when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot()) + .thenReturn(getAuthorityCompleted); + reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient); // Verify that reportResourceCountGauge is never called From 6e6884695e868c0ec2b1372aef542323109ea4c0 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Tue, 15 Apr 2025 19:05:48 +0530 Subject: [PATCH 3/9] Rerun linux kokoro tests From 724f86e7fbd08d03908e44ae2ef9fd0c7de5f3be Mon Sep 17 00:00:00 2001 From: AgraVator Date: Mon, 21 Apr 2025 12:46:38 +0530 Subject: [PATCH 4/9] 1. xds: resolves data race 2. xds: considers authority as a separate dimension --- .../grpc/xds/XdsClientMetricReporterImpl.java | 55 ++++++++++++------- .../java/io/grpc/xds/client/XdsClient.java | 20 +++---- .../io/grpc/xds/client/XdsClientImpl.java | 39 ++++--------- .../xds/XdsClientMetricReporterImplTest.java | 45 ++++----------- 4 files changed, 65 insertions(+), 94 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java index 86de02e9159..a3018514a82 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java @@ -34,6 +34,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -41,6 +42,7 @@ import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -143,13 +145,15 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) { Map, Map> metadataByType = getResourceMetadataCompleted.get(10, TimeUnit.SECONDS); - ListenableFuture, Map>> - getResourceAuthorityCompleted = xdsClient.getSubscribedResourcesAuthoritySnapshot(); + List resourceNames = metadataByType.values() + .stream() + .flatMap(innerMap -> innerMap.keySet().stream()) + .collect(Collectors.toList()); - Map, Map> authorityByType = - getResourceAuthorityCompleted.get(10, TimeUnit.SECONDS); + Map resourceNameToAuthority = + xdsClient.getResourceNameToAuthorityMap(resourceNames); - computeAndReportResourceCounts(metadataByType, authorityByType, callback); + computeAndReportResourceCounts(metadataByType, resourceNameToAuthority, callback); // Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS); @@ -163,27 +167,37 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) { private void computeAndReportResourceCounts( Map, Map> metadataByType, - Map, Map> authorityByType, + Map resourceNameToAuthority, MetricReporterCallback callback) { for (Map.Entry, Map> metadataByTypeEntry : metadataByType.entrySet()) { XdsResourceType type = metadataByTypeEntry.getKey(); + Map resources = metadataByTypeEntry.getValue(); - Map resourceCountsByState = new HashMap<>(); - Map authorityByState = new HashMap<>(); - for (Map.Entry metadataByName : - metadataByTypeEntry.getValue().entrySet()) { - String resourceName = metadataByName.getKey(); - ResourceMetadata metadata = metadataByName.getValue(); + Map> resourceCountsByAuthorityAndState = new HashMap<>(); + for (Map.Entry resourceEntry : resources.entrySet()) { + String resourceName = resourceEntry.getKey(); + ResourceMetadata metadata = resourceEntry.getValue(); + String authority = resourceNameToAuthority.getOrDefault(resourceName, ""); String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached()); - resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1); - authorityByState.put(cacheState, authorityByType.get(type).get(resourceName)); + resourceCountsByAuthorityAndState + .computeIfAbsent(authority, k -> new HashMap<>()) + .merge(cacheState, 1L, Long::sum); } - resourceCountsByState.forEach((cacheState, count) -> { - callback.reportResourceCountGauge(authorityByState.get(cacheState), - count, cacheState, type.typeUrl()); - }); + // Report metrics + for (Map.Entry> authorityEntry + : resourceCountsByAuthorityAndState.entrySet()) { + String authority = authorityEntry.getKey(); + Map stateCounts = authorityEntry.getValue(); + + for (Map.Entry stateEntry : stateCounts.entrySet()) { + String cacheState = stateEntry.getKey(); + Long count = stateEntry.getValue(); + + callback.reportResourceCountGauge(authority, count, cacheState, type.typeUrl()); + } + } } } @@ -213,12 +227,11 @@ static final class MetricReporterCallback implements ServerConnectionCallback { this.target = target; } - // TODO(dnvindhya): include the "authority" label once xds.authority is available. void reportResourceCountGauge(String authority, long resourceCount, String cacheState, String resourceType) { recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount, - Arrays.asList(target, authority == null ? "#old" : authority, - cacheState, resourceType), Collections.emptyList()); + Arrays.asList(target, authority.isEmpty() ? "#old" : authority, + cacheState, resourceType), Collections.emptyList()); } @Override diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClient.java b/xds/src/main/java/io/grpc/xds/client/XdsClient.java index 07d2f3ef82b..ff179ccd786 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClient.java @@ -319,19 +319,6 @@ public Object getSecurityConfig() { throw new UnsupportedOperationException(); } - /** - * Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as - * they are at the moment of the call. - * - *

The snapshot is a map from the "resource type" to - * a map ("resource name": "authority"). - */ - // Must be synchronized. - public ListenableFuture, Map>> - getSubscribedResourcesAuthoritySnapshot() { - throw new UnsupportedOperationException(); - } - /** * Registers a data watcher for the given Xds resource. */ @@ -391,6 +378,13 @@ public Map getServerLrsClientMap() { throw new UnsupportedOperationException(); } + /** + * Returns a map of resource names to the authority. + */ + public Map getResourceNameToAuthorityMap(List resourceNames) { + throw new UnsupportedOperationException(); + } + /** Callback used to report a gauge metric value for server connections. */ public interface ServerConnectionCallback { void reportServerConnectionGauge(boolean isConnected, String xdsServer); diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 5c50354ed1e..d7fb323aea6 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -57,6 +57,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -242,33 +243,6 @@ public void run() { return future; } - // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic. - // ResourceTypes that do not have subscribers does not show up in the snapshot keys. - @Override - public ListenableFuture, Map>> - getSubscribedResourcesAuthoritySnapshot() { - final SettableFuture, Map>> future = - SettableFuture.create(); - syncContext.execute(new Runnable() { - @Override - public void run() { - // A map from a "resource type" to a map ("resource name": "authority") - ImmutableMap.Builder, Map> authoritySnapshot = - ImmutableMap.builder(); - for (XdsResourceType resourceType : resourceSubscribers.keySet()) { - ImmutableMap.Builder authorityMap = ImmutableMap.builder(); - for (Map.Entry> resourceEntry - : resourceSubscribers.get(resourceType).entrySet()) { - authorityMap.put(resourceEntry.getKey(), resourceEntry.getValue().authority); - } - authoritySnapshot.put(resourceType, authorityMap.buildOrThrow()); - } - future.set(authoritySnapshot.buildOrThrow()); - } - }); - return future; - } - @Override public Object getSecurityConfig() { return securityConfig; @@ -572,6 +546,17 @@ private String getAuthority(String resource) { return authority; } + @Override + public Map getResourceNameToAuthorityMap(List resourceNames) { + if (resourceNames == null || resourceNames.isEmpty()) { + return Collections.emptyMap(); + } + return resourceNames.stream() + .collect(Collectors.toMap( + Function.identity(), + this::getAuthority)); + } + @Nullable private ImmutableList getServerInfos(String authority) { if (authority != null) { diff --git a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java index 8e18bc50f14..13497977a9b 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java @@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.inOrder; @@ -129,8 +130,6 @@ public void setXdsClient_reportMetrics() throws Exception { future.set(null); when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture( ImmutableMap.of())); - when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot()) - .thenReturn(Futures.immediateFuture(ImmutableMap.of())); when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class))) .thenReturn(future); reporter.setXdsClient(mockXdsClient); @@ -152,8 +151,6 @@ public void setXdsClient_reportCallbackMetrics_resourceCountsFails() { future.set(null); when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture( ImmutableMap.of())); - when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot()) - .thenReturn(Futures.immediateFuture(ImmutableMap.of())); // Create a future that will throw an exception SettableFuture serverConnectionsFeature = SettableFuture.create(); @@ -181,8 +178,6 @@ public void metricGauges() { future.set(null); when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture( ImmutableMap.of())); - when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot()) - .thenReturn(Futures.immediateFuture(ImmutableMap.of())); when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class))) .thenReturn(future); reporter.setXdsClient(mockXdsClient); @@ -238,7 +233,7 @@ public void metricReporterCallback() { @Test public void reportCallbackMetrics_computeAndReportResourceCounts() { Map, Map> metadataByType = new HashMap<>(); - Map, Map> authorityByType = new HashMap<>(); + Map resourceNameByAuthority = new HashMap<>(); XdsResourceType listenerResource = XdsListenerResource.getInstance(); XdsResourceType routeConfigResource = XdsRouteConfigureResource.getInstance(); XdsResourceType clusterResource = XdsClusterResource.getInstance(); @@ -248,44 +243,37 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { long nanosLastUpdate = 1577923199_606042047L; Map ldsResourceMetadataMap = new HashMap<>(); - Map ldsAuthorityMap = new HashMap<>(); ldsResourceMetadataMap.put("resource1", ResourceMetadata.newResourceMetadataRequested()); - ldsAuthorityMap.put("resource1", "authority1"); + resourceNameByAuthority.put("resource1", "authority1"); ResourceMetadata ackedLdsResource = ResourceMetadata.newResourceMetadataAcked(rawListener, "42", nanosLastUpdate); ldsResourceMetadataMap.put("resource2", ackedLdsResource); - ldsAuthorityMap.put("resource2", "authority2"); + resourceNameByAuthority.put("resource2", "authority2"); ldsResourceMetadataMap.put("resource3", ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate)); - ldsAuthorityMap.put("resource3", "authority3"); + resourceNameByAuthority.put("resource3", "authority2"); ldsResourceMetadataMap.put("resource4", ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", nanosLastUpdate, "nacked after previous ack", true)); - ldsAuthorityMap.put("resource4", "authority4"); + resourceNameByAuthority.put("resource4", "authority4"); Map rdsResourceMetadataMap = new HashMap<>(); - Map rdsAuthorityMap = new HashMap<>(); ResourceMetadata requestedRdsResourceMetadata = ResourceMetadata.newResourceMetadataRequested(); rdsResourceMetadataMap.put("resource5", ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24", nanosLastUpdate, "nacked after request", false)); - rdsAuthorityMap.put("resource5", "authority5"); + resourceNameByAuthority.put("resource5", "authority5"); rdsResourceMetadataMap.put("resource6", ResourceMetadata.newResourceMetadataDoesNotExist()); - rdsAuthorityMap.put("resource6", "authority6"); + resourceNameByAuthority.put("resource6", "authority6"); Map cdsResourceMetadataMap = new HashMap<>(); - Map cdsAuthorityMap = new HashMap<>(); cdsResourceMetadataMap.put("resource7", ResourceMetadata.newResourceMetadataUnknown()); - cdsAuthorityMap.put("resource7", "authority7"); metadataByType.put(listenerResource, ldsResourceMetadataMap); - authorityByType.put(listenerResource, ldsAuthorityMap); metadataByType.put(routeConfigResource, rdsResourceMetadataMap); - authorityByType.put(routeConfigResource, rdsAuthorityMap); metadataByType.put(clusterResource, cdsResourceMetadataMap); - authorityByType.put(clusterResource, cdsAuthorityMap); SettableFuture reportServerConnectionsCompleted = SettableFuture.create(); reportServerConnectionsCompleted.set(null); @@ -297,10 +285,8 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()) .thenReturn(getResourceMetadataCompleted); - ListenableFuture, Map>> - getResourceAuthorityCompleted = Futures.immediateFuture(authorityByType); - when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot()) - .thenReturn(getResourceAuthorityCompleted); + when(mockXdsClient.getResourceNameToAuthorityMap(anyList())) + .thenReturn(resourceNameByAuthority); reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient); @@ -310,7 +296,7 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { "requested", listenerResource.typeUrl())), any()); // LDS resources acked verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(2L), eq(Arrays.asList(target, "authority3", + eq(2L), eq(Arrays.asList(target, "authority2", "acked", listenerResource.typeUrl())), any()); // LDS resource nacked but cached verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), @@ -328,7 +314,7 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { // CDS resource unknown verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "authority7", + eq(1L), eq(Arrays.asList(target, "#old", "unknown", clusterResource.typeUrl())), any()); verifyNoMoreInteractions(mockBatchRecorder); } @@ -336,10 +322,8 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { @Test public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources() { Map, Map> metadataByType = new HashMap<>(); - Map, Map> authorityByType = new HashMap<>(); XdsResourceType listenerResource = XdsListenerResource.getInstance(); metadataByType.put(listenerResource, Collections.emptyMap()); - authorityByType.put(listenerResource, Collections.emptyMap()); SettableFuture reportServerConnectionsCompleted = SettableFuture.create(); reportServerConnectionsCompleted.set(null); @@ -351,11 +335,6 @@ public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources( when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()) .thenReturn(getResourceMetadataCompleted); - ListenableFuture, Map>> - getAuthorityCompleted = Futures.immediateFuture(authorityByType); - when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot()) - .thenReturn(getAuthorityCompleted); - reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient); // Verify that reportResourceCountGauge is never called From 9ed94acfaf5d886ab6f4a8fdbca4ec3086617e30 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Mon, 21 Apr 2025 16:32:21 +0530 Subject: [PATCH 5/9] 1. xds: fixes possible NPE --- xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java index a3018514a82..1476417cf4c 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java @@ -230,7 +230,7 @@ static final class MetricReporterCallback implements ServerConnectionCallback { void reportResourceCountGauge(String authority, long resourceCount, String cacheState, String resourceType) { recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount, - Arrays.asList(target, authority.isEmpty() ? "#old" : authority, + Arrays.asList(target, authority == null || authority.isEmpty() ? "#old" : authority, cacheState, resourceType), Collections.emptyList()); } From 339fc4c3866c74b8791c7df4a5074d8c5d52bb61 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Mon, 21 Apr 2025 21:54:33 +0530 Subject: [PATCH 6/9] xds: makes getAuthority() static --- .../grpc/xds/XdsClientMetricReporterImpl.java | 17 ++----- .../java/io/grpc/xds/client/XdsClient.java | 16 ++++-- .../io/grpc/xds/client/XdsClientImpl.java | 31 +----------- .../xds/XdsClientMetricReporterImplTest.java | 50 ++++++++----------- 4 files changed, 37 insertions(+), 77 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java index 1476417cf4c..1644ac857d3 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java @@ -34,7 +34,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -42,7 +41,6 @@ import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; -import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -145,15 +143,7 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) { Map, Map> metadataByType = getResourceMetadataCompleted.get(10, TimeUnit.SECONDS); - List resourceNames = metadataByType.values() - .stream() - .flatMap(innerMap -> innerMap.keySet().stream()) - .collect(Collectors.toList()); - - Map resourceNameToAuthority = - xdsClient.getResourceNameToAuthorityMap(resourceNames); - - computeAndReportResourceCounts(metadataByType, resourceNameToAuthority, callback); + computeAndReportResourceCounts(metadataByType, callback); // Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS); @@ -167,7 +157,6 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) { private void computeAndReportResourceCounts( Map, Map> metadataByType, - Map resourceNameToAuthority, MetricReporterCallback callback) { for (Map.Entry, Map> metadataByTypeEntry : metadataByType.entrySet()) { @@ -178,7 +167,7 @@ private void computeAndReportResourceCounts( for (Map.Entry resourceEntry : resources.entrySet()) { String resourceName = resourceEntry.getKey(); ResourceMetadata metadata = resourceEntry.getValue(); - String authority = resourceNameToAuthority.getOrDefault(resourceName, ""); + String authority = XdsClient.getAuthorityFromResourceName(resourceName); String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached()); resourceCountsByAuthorityAndState .computeIfAbsent(authority, k -> new HashMap<>()) @@ -230,7 +219,7 @@ static final class MetricReporterCallback implements ServerConnectionCallback { void reportResourceCountGauge(String authority, long resourceCount, String cacheState, String resourceType) { recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount, - Arrays.asList(target, authority == null || authority.isEmpty() ? "#old" : authority, + Arrays.asList(target, authority == null ? "#old" : authority, cacheState, resourceType), Collections.emptyList()); } diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClient.java b/xds/src/main/java/io/grpc/xds/client/XdsClient.java index ff179ccd786..167171651a7 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClient.java @@ -379,10 +379,20 @@ public Map getServerLrsClientMap() { } /** - * Returns a map of resource names to the authority. + * Returns the authority from the resource name. */ - public Map getResourceNameToAuthorityMap(List resourceNames) { - throw new UnsupportedOperationException(); + public static String getAuthorityFromResourceName(String resourceNames) { + String authority; + if (resourceNames.startsWith(XDSTP_SCHEME)) { + URI uri = URI.create(resourceNames); + authority = uri.getAuthority(); + if (authority == null) { + authority = ""; + } + } else { + authority = null; + } + return authority; } /** Callback used to report a gauge metric value for server connections. */ diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index d7fb323aea6..4de8ead7c0a 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME; import static io.grpc.xds.client.XdsResourceType.ParsedResource; import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate; @@ -43,7 +42,6 @@ import io.grpc.xds.client.XdsClient.ResourceStore; import io.grpc.xds.client.XdsLogger.XdsLogLevel; import java.io.IOException; -import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -57,7 +55,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -531,32 +528,6 @@ public Map getServerLrsClientMap() { return ImmutableMap.copyOf(serverLrsClientMap); } - private String getAuthority(String resource) { - String authority; - if (resource.startsWith(XDSTP_SCHEME)) { - URI uri = URI.create(resource); - authority = uri.getAuthority(); - if (authority == null) { - authority = ""; - } - } else { - authority = null; - } - - return authority; - } - - @Override - public Map getResourceNameToAuthorityMap(List resourceNames) { - if (resourceNames == null || resourceNames.isEmpty()) { - return Collections.emptyMap(); - } - return resourceNames.stream() - .collect(Collectors.toMap( - Function.identity(), - this::getAuthority)); - } - @Nullable private ImmutableList getServerInfos(String authority) { if (authority != null) { @@ -710,7 +681,7 @@ private final class ResourceSubscriber { syncContext.throwIfNotInThisSynchronizationContext(); this.type = type; this.resource = resource; - this.authority = getAuthority(resource); + this.authority = getAuthorityFromResourceName(resource); if (getServerInfos(authority) == null) { this.errorDescription = "Wrong configuration: xds server does not exist for resource " + resource; diff --git a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java index 13497977a9b..b75d711196d 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java @@ -19,7 +19,6 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.inOrder; @@ -233,43 +232,36 @@ public void metricReporterCallback() { @Test public void reportCallbackMetrics_computeAndReportResourceCounts() { Map, Map> metadataByType = new HashMap<>(); - Map resourceNameByAuthority = new HashMap<>(); XdsResourceType listenerResource = XdsListenerResource.getInstance(); XdsResourceType routeConfigResource = XdsRouteConfigureResource.getInstance(); XdsResourceType clusterResource = XdsClusterResource.getInstance(); Any rawListener = - Any.pack(Listener.newBuilder().setName("listener.googleapis.com").build()); + Any.pack(Listener.newBuilder().setName("listener.googleapis.com").build()); long nanosLastUpdate = 1577923199_606042047L; Map ldsResourceMetadataMap = new HashMap<>(); - ldsResourceMetadataMap.put("resource1", - ResourceMetadata.newResourceMetadataRequested()); - resourceNameByAuthority.put("resource1", "authority1"); + ldsResourceMetadataMap.put("xdstp://authority1", + ResourceMetadata.newResourceMetadataRequested()); ResourceMetadata ackedLdsResource = ResourceMetadata.newResourceMetadataAcked(rawListener, "42", - nanosLastUpdate); + nanosLastUpdate); ldsResourceMetadataMap.put("resource2", ackedLdsResource); - resourceNameByAuthority.put("resource2", "authority2"); ldsResourceMetadataMap.put("resource3", - ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate)); - resourceNameByAuthority.put("resource3", "authority2"); - ldsResourceMetadataMap.put("resource4", - ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", nanosLastUpdate, - "nacked after previous ack", true)); - resourceNameByAuthority.put("resource4", "authority4"); + ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate)); + ldsResourceMetadataMap.put("xdstp:/need_this", + ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", nanosLastUpdate, + "nacked after previous ack", true)); Map rdsResourceMetadataMap = new HashMap<>(); ResourceMetadata requestedRdsResourceMetadata = ResourceMetadata.newResourceMetadataRequested(); - rdsResourceMetadataMap.put("resource5", - ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24", - nanosLastUpdate, "nacked after request", false)); - resourceNameByAuthority.put("resource5", "authority5"); - rdsResourceMetadataMap.put("resource6", - ResourceMetadata.newResourceMetadataDoesNotExist()); - resourceNameByAuthority.put("resource6", "authority6"); + rdsResourceMetadataMap.put("xdstp://authority5", + ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24", + nanosLastUpdate, "nacked after request", false)); + rdsResourceMetadataMap.put("xdstp://authority6", + ResourceMetadata.newResourceMetadataDoesNotExist()); Map cdsResourceMetadataMap = new HashMap<>(); - cdsResourceMetadataMap.put("resource7", ResourceMetadata.newResourceMetadataUnknown()); + cdsResourceMetadataMap.put("xdstp://authority7", ResourceMetadata.newResourceMetadataUnknown()); metadataByType.put(listenerResource, ldsResourceMetadataMap); metadataByType.put(routeConfigResource, rdsResourceMetadataMap); @@ -278,15 +270,13 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { SettableFuture reportServerConnectionsCompleted = SettableFuture.create(); reportServerConnectionsCompleted.set(null); when(mockXdsClient.reportServerConnections(any(MetricReporterCallback.class))) - .thenReturn(reportServerConnectionsCompleted); + .thenReturn(reportServerConnectionsCompleted); ListenableFuture, Map>> - getResourceMetadataCompleted = Futures.immediateFuture(metadataByType); + getResourceMetadataCompleted = Futures.immediateFuture(metadataByType); when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()) - .thenReturn(getResourceMetadataCompleted); + .thenReturn(getResourceMetadataCompleted); - when(mockXdsClient.getResourceNameToAuthorityMap(anyList())) - .thenReturn(resourceNameByAuthority); reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient); @@ -296,11 +286,11 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { "requested", listenerResource.typeUrl())), any()); // LDS resources acked verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(2L), eq(Arrays.asList(target, "authority2", + eq(2L), eq(Arrays.asList(target, "#old", "acked", listenerResource.typeUrl())), any()); // LDS resource nacked but cached verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "authority4", + eq(1L), eq(Arrays.asList(target, "", "nacked_but_cached", listenerResource.typeUrl())), any()); // RDS resource nacked @@ -314,7 +304,7 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { // CDS resource unknown verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "#old", + eq(1L), eq(Arrays.asList(target, "authority7", "unknown", clusterResource.typeUrl())), any()); verifyNoMoreInteractions(mockBatchRecorder); } From cc3a85fa4a1c92e63bfa5140c44bbbb5003350d3 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Tue, 22 Apr 2025 10:59:22 +0530 Subject: [PATCH 7/9] 1. xds: refactor reportResourceCountGauge() signature 2. xds: move getAuthorityFromResourceName() near other static methods --- .../java/io/grpc/xds/client/XdsClient.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClient.java b/xds/src/main/java/io/grpc/xds/client/XdsClient.java index 167171651a7..edbb0b2d74c 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClient.java @@ -118,6 +118,23 @@ public static String percentEncodePath(String input) { return Joiner.on('/').join(encodedSegs); } + /** + * Returns the authority from the resource name. + */ + public static String getAuthorityFromResourceName(String resourceNames) { + String authority; + if (resourceNames.startsWith(XDSTP_SCHEME)) { + URI uri = URI.create(resourceNames); + authority = uri.getAuthority(); + if (authority == null) { + authority = ""; + } + } else { + authority = null; + } + return authority; + } + public interface ResourceUpdate {} /** @@ -378,23 +395,6 @@ public Map getServerLrsClientMap() { throw new UnsupportedOperationException(); } - /** - * Returns the authority from the resource name. - */ - public static String getAuthorityFromResourceName(String resourceNames) { - String authority; - if (resourceNames.startsWith(XDSTP_SCHEME)) { - URI uri = URI.create(resourceNames); - authority = uri.getAuthority(); - if (authority == null) { - authority = ""; - } - } else { - authority = null; - } - return authority; - } - /** Callback used to report a gauge metric value for server connections. */ public interface ServerConnectionCallback { void reportServerConnectionGauge(boolean isConnected, String xdsServer); From 10f54d8df4b99c29602b43c493e79fcd421284e3 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Tue, 22 Apr 2025 10:59:48 +0530 Subject: [PATCH 8/9] 1. xds: refactor reportResourceCountGauge() signature 2. xds: move getAuthorityFromResourceName() near other static methods --- .../main/java/io/grpc/xds/XdsClientMetricReporterImpl.java | 4 ++-- .../java/io/grpc/xds/XdsClientMetricReporterImplTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java index 1644ac857d3..24773195305 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java @@ -184,7 +184,7 @@ private void computeAndReportResourceCounts( String cacheState = stateEntry.getKey(); Long count = stateEntry.getValue(); - callback.reportResourceCountGauge(authority, count, cacheState, type.typeUrl()); + callback.reportResourceCountGauge(count, authority, cacheState, type.typeUrl()); } } } @@ -216,7 +216,7 @@ static final class MetricReporterCallback implements ServerConnectionCallback { this.target = target; } - void reportResourceCountGauge(String authority, long resourceCount, String cacheState, + void reportResourceCountGauge(long resourceCount, String authority, String cacheState, String resourceType) { recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount, Arrays.asList(target, authority == null ? "#old" : authority, diff --git a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java index b75d711196d..c3b04352c2c 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java @@ -199,7 +199,7 @@ public void metricGauges() { // Verify that reportResourceCounts and reportServerConnections were called // with the captured callback - callback.reportResourceCountGauge("PotatoHead", 10, "acked", resourceTypeUrl); + callback.reportResourceCountGauge(10, "PotatoHead", "acked", resourceTypeUrl); inOrder.verify(mockBatchRecorder) .recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), any(), any()); @@ -222,7 +222,7 @@ public void metricReporterCallback() { eq(Lists.newArrayList())); String cacheState = "requested"; - callback.reportResourceCountGauge(authority, 10, cacheState, resourceTypeUrl); + callback.reportResourceCountGauge(10, authority, cacheState, resourceTypeUrl); verify(mockBatchRecorder, times(1)).recordLongGauge( eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), eq(Arrays.asList(target, authority, cacheState, resourceTypeUrl)), From 2e7c5b07a6a5e4c064fa604fa25956c1f0af17c9 Mon Sep 17 00:00:00 2001 From: AgraVator Date: Tue, 22 Apr 2025 13:01:36 +0530 Subject: [PATCH 9/9] xds: refactors line wrapping indentations --- .../grpc/xds/XdsClientMetricReporterImpl.java | 7 +- .../xds/XdsClientMetricReporterImplTest.java | 68 ++++++++++--------- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java index 24773195305..5cfba11c065 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java @@ -176,7 +176,7 @@ private void computeAndReportResourceCounts( // Report metrics for (Map.Entry> authorityEntry - : resourceCountsByAuthorityAndState.entrySet()) { + : resourceCountsByAuthorityAndState.entrySet()) { String authority = authorityEntry.getKey(); Map stateCounts = authorityEntry.getValue(); @@ -218,9 +218,10 @@ static final class MetricReporterCallback implements ServerConnectionCallback { void reportResourceCountGauge(long resourceCount, String authority, String cacheState, String resourceType) { + // authority = #old, for non-xdstp resource names recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount, - Arrays.asList(target, authority == null ? "#old" : authority, - cacheState, resourceType), Collections.emptyList()); + Arrays.asList(target, authority == null ? "#old" : authority, cacheState, resourceType), + Collections.emptyList()); } @Override diff --git a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java index c3b04352c2c..509a0025b7b 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java @@ -175,8 +175,8 @@ public void setXdsClient_reportCallbackMetrics_resourceCountsFails() { public void metricGauges() { SettableFuture future = SettableFuture.create(); future.set(null); - when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture( - ImmutableMap.of())); + when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()) + .thenReturn(Futures.immediateFuture(ImmutableMap.of())); when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class))) .thenReturn(future); reporter.setXdsClient(mockXdsClient); @@ -199,13 +199,15 @@ public void metricGauges() { // Verify that reportResourceCounts and reportServerConnections were called // with the captured callback - callback.reportResourceCountGauge(10, "PotatoHead", "acked", resourceTypeUrl); + callback.reportResourceCountGauge(10, "MrPotatoHead", + "acked", resourceTypeUrl); inOrder.verify(mockBatchRecorder) .recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), any(), any()); callback.reportServerConnectionGauge(true, "xdsServer"); inOrder.verify(mockBatchRecorder) - .recordLongGauge(eqMetricInstrumentName("grpc.xds_client.connected"), eq(1L), any(), any()); + .recordLongGauge(eqMetricInstrumentName("grpc.xds_client.connected"), + eq(1L), any(), any()); inOrder.verifyNoMoreInteractions(); } @@ -236,29 +238,28 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { XdsResourceType routeConfigResource = XdsRouteConfigureResource.getInstance(); XdsResourceType clusterResource = XdsClusterResource.getInstance(); - Any rawListener = - Any.pack(Listener.newBuilder().setName("listener.googleapis.com").build()); + Any rawListener = Any.pack(Listener.newBuilder().setName("listener.googleapis.com").build()); long nanosLastUpdate = 1577923199_606042047L; Map ldsResourceMetadataMap = new HashMap<>(); ldsResourceMetadataMap.put("xdstp://authority1", - ResourceMetadata.newResourceMetadataRequested()); - ResourceMetadata ackedLdsResource = ResourceMetadata.newResourceMetadataAcked(rawListener, "42", - nanosLastUpdate); + ResourceMetadata.newResourceMetadataRequested()); + ResourceMetadata ackedLdsResource = + ResourceMetadata.newResourceMetadataAcked(rawListener, "42", nanosLastUpdate); ldsResourceMetadataMap.put("resource2", ackedLdsResource); ldsResourceMetadataMap.put("resource3", - ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate)); - ldsResourceMetadataMap.put("xdstp:/need_this", - ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", nanosLastUpdate, - "nacked after previous ack", true)); + ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate)); + ldsResourceMetadataMap.put("xdstp:/no_authority", + ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", + nanosLastUpdate, "nacked after previous ack", true)); Map rdsResourceMetadataMap = new HashMap<>(); ResourceMetadata requestedRdsResourceMetadata = ResourceMetadata.newResourceMetadataRequested(); rdsResourceMetadataMap.put("xdstp://authority5", - ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24", - nanosLastUpdate, "nacked after request", false)); + ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24", + nanosLastUpdate, "nacked after request", false)); rdsResourceMetadataMap.put("xdstp://authority6", - ResourceMetadata.newResourceMetadataDoesNotExist()); + ResourceMetadata.newResourceMetadataDoesNotExist()); Map cdsResourceMetadataMap = new HashMap<>(); cdsResourceMetadataMap.put("xdstp://authority7", ResourceMetadata.newResourceMetadataUnknown()); @@ -270,42 +271,45 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() { SettableFuture reportServerConnectionsCompleted = SettableFuture.create(); reportServerConnectionsCompleted.set(null); when(mockXdsClient.reportServerConnections(any(MetricReporterCallback.class))) - .thenReturn(reportServerConnectionsCompleted); + .thenReturn(reportServerConnectionsCompleted); ListenableFuture, Map>> - getResourceMetadataCompleted = Futures.immediateFuture(metadataByType); + getResourceMetadataCompleted = Futures.immediateFuture(metadataByType); when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()) - .thenReturn(getResourceMetadataCompleted); - + .thenReturn(getResourceMetadataCompleted); reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient); // LDS resource requested verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "authority1", - "requested", listenerResource.typeUrl())), any()); + eq(1L), + eq(Arrays.asList(target, "authority1", "requested", listenerResource.typeUrl())), any()); // LDS resources acked + // authority = #old, for non-xdstp resource names verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(2L), eq(Arrays.asList(target, "#old", - "acked", listenerResource.typeUrl())), any()); + eq(2L), + eq(Arrays.asList(target, "#old", "acked", listenerResource.typeUrl())), any()); // LDS resource nacked but cached + // "" for missing authority in the resource name verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "", - "nacked_but_cached", listenerResource.typeUrl())), any()); + eq(1L), + eq(Arrays.asList(target, "", "nacked_but_cached", listenerResource.typeUrl())), any()); // RDS resource nacked verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "authority5", - "nacked", routeConfigResource.typeUrl())), any()); + eq(1L), + eq(Arrays.asList(target, "authority5", "nacked", routeConfigResource.typeUrl())), any()); // RDS resource does not exist verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "authority6", - "does_not_exist", routeConfigResource.typeUrl())), any()); + eq(1L), + eq(Arrays.asList(target, "authority6", "does_not_exist", routeConfigResource.typeUrl())), + any()); // CDS resource unknown verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), - eq(1L), eq(Arrays.asList(target, "authority7", - "unknown", clusterResource.typeUrl())), any()); + eq(1L), + eq(Arrays.asList(target, "authority7", "unknown", clusterResource.typeUrl())), + any()); verifyNoMoreInteractions(mockBatchRecorder); }