Skip to content

Commit

Permalink
Merge pull request #5472 from zhzhuang-zju/dial
Browse files Browse the repository at this point in the history
karmada-scheduler-estimator add the support for custom DNS suffix
  • Loading branch information
karmada-bot authored Oct 19, 2024
2 parents 4294e60 + 4dfb1ef commit fdc47f8
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 22 deletions.
9 changes: 5 additions & 4 deletions pkg/estimator/client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package client

import (
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -108,20 +109,20 @@ func EstablishConnection(kubeClient kubernetes.Interface, serviceInfo SchedulerE
return nil
}

serverAddr, err := resolveCluster(kubeClient, serviceInfo.Namespace,
serverAddrs, err := resolveCluster(kubeClient, serviceInfo.Namespace,
names.GenerateEstimatorServiceName(serviceInfo.NamePrefix, serviceInfo.Name), int32(grpcConfig.TargetPort))
if err != nil {
return err
}

klog.Infof("Start dialing estimator server(%s) of cluster(%s).", serverAddr, serviceInfo.Name)
cc, err := grpcConfig.DialWithTimeOut(serverAddr, 5*time.Second)
klog.Infof("Start dialing estimator server(%s) of cluster(%s).", strings.Join(serverAddrs, ","), serviceInfo.Name)
cc, err := grpcConfig.DialWithTimeOut(serverAddrs, 5*time.Second)
if err != nil {
klog.Errorf("Failed to dial cluster(%s): %v.", serviceInfo.Name, err)
return err
}
c := estimatorservice.NewEstimatorClient(cc)
estimatorCache.AddCluster(serviceInfo.Name, cc, c)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", serverAddr, serviceInfo.Name)
klog.Infof("Connection with estimator server(%s) of cluster(%s) has been established.", cc.Target(), serviceInfo.Name)
return nil
}
20 changes: 12 additions & 8 deletions pkg/estimator/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,37 +31,41 @@ import (
// ResolveCluster parses Service resource content by itself.
// Fixes Issue https://github.com/karmada-io/karmada/issues/2487
// Modified from "k8s.io/apiserver/pkg/util/proxy/proxy.go:92 => func ResolveCluster"
func resolveCluster(kubeClient kubernetes.Interface, namespace, id string, port int32) (string, error) {
func resolveCluster(kubeClient kubernetes.Interface, namespace, id string, port int32) ([]string, error) {
svc, err := kubeClient.CoreV1().Services(namespace).Get(context.TODO(), id, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
/*
* When Deploying Karmada in Host Kubernetes Cluster, the kubeClient will connect kube-apiserver
* of Karmada Control Plane, rather than of host cluster.
* But the Service resource is defined in Host Kubernetes Cluster. So we cannot get its content here.
* The best thing we can do is just glue host:port together, and try to connect to it.
* The best thing we can do is just assemble hosts and ports according to a specific rule, and try to connect to them.
*/
return net.JoinHostPort(fmt.Sprintf("%s.%s.svc.cluster.local", id, namespace), fmt.Sprintf("%d", port)), nil
return []string{
net.JoinHostPort(fmt.Sprintf("%s.%s.svc.cluster.local", id, namespace), fmt.Sprintf("%d", port)),
// To support the environment with a custom DNS suffix.
net.JoinHostPort(fmt.Sprintf("%s.%s.svc", id, namespace), fmt.Sprintf("%d", port)),
}, nil
}

return "", err
return nil, err
}

if svc.Spec.Type != corev1.ServiceTypeExternalName {
// We only support ExternalName type here.
// See discussions in PR: https://github.com/karmada-io/karmada/pull/2574#discussion_r979539389
return "", fmt.Errorf("unsupported service type %q", svc.Spec.Type)
return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type)
}

svcPort, err := findServicePort(svc, port)
if err != nil {
return "", err
return nil, err
}
if svcPort.TargetPort.Type != intstr.Int {
return "", fmt.Errorf("ExternalName service type should have int target port, "+
return nil, fmt.Errorf("ExternalName service type should have int target port, "+
"current target port: %v", svcPort.TargetPort)
}
return net.JoinHostPort(svc.Spec.ExternalName, fmt.Sprintf("%d", svcPort.TargetPort.IntVal)), nil
return []string{net.JoinHostPort(svc.Spec.ExternalName, fmt.Sprintf("%d", svcPort.TargetPort.IntVal))}, nil
}

// findServicePort finds the service port by name or numerically.
Expand Down
11 changes: 6 additions & 5 deletions pkg/estimator/client/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package client

import (
"context"
"reflect"
"testing"

corev1 "k8s.io/api/core/v1"
Expand All @@ -34,15 +35,15 @@ func TestResolveCluster(t *testing.T) {
port int32
service *corev1.Service
expectError bool
expected string
expected []string
}{
{
name: "Service not found",
namespace: "default",
id: "nonexistent",
port: 80,
service: nil,
expected: "nonexistent.default.svc.cluster.local:80",
expected: []string{"nonexistent.default.svc.cluster.local:80", "nonexistent.default.svc:80"},
},
{
name: "Unsupported service type",
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestResolveCluster(t *testing.T) {
},
},
},
expected: "example.com:8080",
expected: []string{"example.com:8080"},
},
{
name: "ExternalName service with non-int target port",
Expand Down Expand Up @@ -122,8 +123,8 @@ func TestResolveCluster(t *testing.T) {
if (err != nil) != tt.expectError {
t.Errorf("expected error: %v, got: %v", tt.expectError, err)
}
if result != tt.expected {
t.Errorf("expected: %s, got: %s", tt.expected, result)
if !reflect.DeepEqual(tt.expected, result) {
t.Errorf("expected: %v, got: %v", tt.expected, result)
}
})
}
Expand Down
27 changes: 22 additions & 5 deletions pkg/util/grpcconnection/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc"
grpccredentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
)

// ServerConfig the config of GRPC server side.
Expand Down Expand Up @@ -99,11 +100,8 @@ func (s *ServerConfig) NewServer() (*grpc.Server, error) {
return grpc.NewServer(grpc.Creds(grpccredentials.NewTLS(config))), nil
}

// DialWithTimeOut creates a client connection to the given target.
func (c *ClientConfig) DialWithTimeOut(path string, timeout time.Duration) (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// DialWithTimeOut will attempt to create a client connection based on the given targets, one at a time, until a client connection is successfully established.
func (c *ClientConfig) DialWithTimeOut(paths []string, timeout time.Duration) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{
grpc.WithBlock(),
}
Expand Down Expand Up @@ -138,6 +136,25 @@ func (c *ClientConfig) DialWithTimeOut(path string, timeout time.Duration) (*grp
}

opts = append(opts, grpc.WithTransportCredentials(cred))

var cc *grpc.ClientConn
var err error
var allErrs []error
for _, path := range paths {
cc, err = createGRPCConnection(path, timeout, opts...)
if err == nil {
return cc, nil
}
allErrs = append(allErrs, err)
}

return nil, utilerrors.NewAggregate(allErrs)
}

func createGRPCConnection(path string, timeout time.Duration, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

cc, err := grpc.DialContext(ctx, path, opts...)
if err != nil {
return nil, fmt.Errorf("dial %s error: %v", path, err)
Expand Down

0 comments on commit fdc47f8

Please sign in to comment.