-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add xDS Kubernetes service #980
Conversation
This is ready. PTAL. 🙇 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some questions but looks good to me 👍 👍 👍
kubernetesWatchers.values().forEach(map -> { | ||
map.values().forEach(KubernetesEndpointGroup::closeAsync); | ||
map.clear(); | ||
}); | ||
kubernetesWatchers.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question) Would this be called from plugins-for-*
threads here, whereas map updates will be called from k8s-plugin-executor
? i.e. the map doesn't seem to be thread-safe
I believe the stop
should also be done cleanly since it is not uncommon for a node to lose leadership
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, let me fix that. 😓
return; | ||
} | ||
executor().execute( | ||
() -> pushK8sEndpoints(kubernetesEndpointGroup, groupName, endpoints, watcherName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
() -> pushK8sEndpoints(kubernetesEndpointGroup, groupName, endpoints, watcherName)); | |
() -> pushK8sEndpoints(kubernetesEndpointGroup, groupName, endpoints, watcherName)); |
if (kubernetesEndpointGroup.isClosing()) { | ||
return; | ||
} | ||
final LocalityLbEndpoints.Builder localityLbEndpointsBuilder = LocalityLbEndpoints.newBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note) I understood that Locality
is not set for this iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct. 👍
executor().execute( | ||
() -> pushK8sEndpoints(kubernetesEndpointGroup, groupName, endpoints, watcherName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; there is probably a low chance this happens, but I understood that if listener invocation and delete occurs in quick succession, then it is possible that the endpoints remain even though it should be deleted.
I think it's fine to leave this as a known issue, but just want to check if this scenario is possible.
e.g.
handleXdsResource
is calledonFileRemoved
is called immediately- the
pushK8sEndpoints
invoked by step 1 is called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so because they are all executed by the executor()
.
1 handleXdsResource
is called by the executor
2 onFileRemoved
is called immediately
3 Thus, the created KubernetesEndpointGroup
is closing
4 the pushK8sEndpoints
invoked by step 1 is called by the executor but it doesn't push a commit because this condition:
Line 163 in 37e4002
if (kubernetesEndpointGroup.isClosing()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed that line 😅 Thanks for the pointer 👍
.withTrustCerts(kubernetesConfig.getTrustCerts()); | ||
|
||
if (!isNullOrEmpty(kubernetesConfig.getOauthToken())) { | ||
configBuilder.withOauthToken(kubernetesConfig.getOauthToken()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question) Am I understanding correctly that as of now users can access this value via APIs? I'm wondering if we're going need to prioritize access control for xDS related APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users cannot see this value. Only creating, updating and deleting (without seeing the value) is possible at the moment.
@Override | ||
public void init(PluginInitContext pluginInitContext) { | ||
final InternalProjectInitializer projectInitializer = pluginInitContext.internalProjectInitializer(); | ||
pluginInitContext.commandExecutor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could be removed?
|
||
public static final String DEFAULT_GROUP = "default_group"; | ||
@Nullable | ||
private volatile ControlPlaneService controlPlaneService; | ||
|
||
public static final long BACKOFF_SECONDS = 30; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems unsed.
@Override | ||
public CompletionStage<Void> start(PluginContext context) { | ||
return UnmodifiableFuture.completedFuture(null); | ||
} | ||
|
||
@Override | ||
public CompletionStage<Void> stop(PluginContext context) { | ||
stop = true; | ||
final ControlPlaneService controlPlaneService = this.controlPlaneService; | ||
assert controlPlaneService != null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess controlPlaneService
could be null if projectInitializer.initialize(...)
fails.
executorService = ExecutorServiceMetrics.monitor( | ||
meterRegistry, Executors.newSingleThreadScheduledExecutor( | ||
new DefaultThreadFactory("k8s-plugin-executor", true)), "k8sPluginExecutor"); | ||
executorService.execute(this::start); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we wait for the result of this::start
? Otherwise, the exception raised in start
won't be propagated to somewhere or logged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, yeah. We need to wait for that. 😉
private static final Pattern WATCHERS_PATTERN = Pattern.compile("(?<=/k8s)/watchers/"); | ||
|
||
private final CommandExecutor commandExecutor; | ||
private final MeterRegistry meterRegistry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: dead code?
|
||
service XdsKubernetesService { | ||
|
||
rpc CreateWatcher(CreateWatcherRequest) returns (Watcher) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional) How about using service
, which seems a simpler expression than watcher
which is more abstract so we may need to explain what it means.
Meanwhile, we can simply say that service
indicates Kubenetes Service, and this method can be used to create xDS resource for Kubenetes Service.
https://kubernetes.io/docs/concepts/services-networking/service/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a discussion and will use CreateServiceEndpointWatcher
instead. 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @minwoox! 🚀🚀
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #980 +/- ##
============================================
+ Coverage 65.61% 71.54% +5.92%
- Complexity 3319 4113 +794
============================================
Files 355 402 +47
Lines 13865 16431 +2566
Branches 1498 1756 +258
============================================
+ Hits 9098 11756 +2658
+ Misses 3916 3642 -274
- Partials 851 1033 +182 ☔ View full report in Codecov by Sentry. |
Add xDS Kubernetes Service
Motivation:
To support integration with Kubernetes, we need APIs for creating, updating, and deleting Kubernetes watchers that retrieve pod endpoint information from the Kubernetes control plane.
Modifications:
xds_kubernetes.proto
files that define gRPC and HTTP APIs for xDS resources.XdsKubernetesService
:createWatcherRequest
stores watcher information at/k8s/watchers/{watcher_id}.json
.KubernetesEndpointGroup
is created and used.KubernetesEndpointGroup
for simplicity, with plans to develop a custom implementation for improved error handling.XdsKubernetesEndpointFetchingPlugin
andXdsKubernetesEndpointFetchingService
:XdsKubernetesEndpointFetchingService
, run by the ZooKeeper leader, createsClusterLoadAssignment
based on watcher information.ClusterLoadAssignment
is stored at/k8s/endpoints/{watcher_id}.json
and replicated via ZooKeeper log replay.ClusterLoadAssignment
isgroups/{group}/k8s/clusters/{watcher_id}.json
, and it is served as an xDS endpoint resource by theControlPlaneService
.Result: