diff --git a/config/config_params.go b/config/config_params.go index 38ab2d25d7..be7b5f68f5 100644 --- a/config/config_params.go +++ b/config/config_params.go @@ -32,6 +32,7 @@ import ( "github.com/projectcalico/libcalico-go/lib/numorstring" "github.com/projectcalico/felix/idalloc" + "github.com/projectcalico/typha/pkg/discovery" ) var ( @@ -766,6 +767,13 @@ func (config *Config) OverrideParam(name, value string) (bool, error) { return config.UpdateFrom(config.internalOverrides, InternalOverride) } +func (config *Config) TyphaDiscoveryOpts() []discovery.Option { + return []discovery.Option{ + discovery.WithAddrOverride(config.TyphaAddr), + discovery.WithKubeService(config.TyphaK8sNamespace, config.TyphaK8sServiceName), + } +} + func New() *Config { if knownParams == nil { loadParams() diff --git a/daemon/daemon.go b/daemon/daemon.go index bb07d1e297..59f8725d59 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -34,21 +34,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "github.com/projectcalico/felix/buildinfo" - "github.com/projectcalico/felix/calc" - "github.com/projectcalico/felix/config" - _ "github.com/projectcalico/felix/config" - dp "github.com/projectcalico/felix/dataplane" - "github.com/projectcalico/felix/jitter" - "github.com/projectcalico/felix/logutils" - "github.com/projectcalico/felix/policysync" - "github.com/projectcalico/felix/proto" - "github.com/projectcalico/felix/statusrep" - "github.com/projectcalico/felix/usagerep" "github.com/projectcalico/libcalico-go/lib/apiconfig" apiv3 "github.com/projectcalico/libcalico-go/lib/apis/v3" "github.com/projectcalico/libcalico-go/lib/backend" @@ -65,7 +53,20 @@ import ( "github.com/projectcalico/libcalico-go/lib/options" "github.com/projectcalico/libcalico-go/lib/set" "github.com/projectcalico/pod2daemon/binder" + "github.com/projectcalico/typha/pkg/discovery" "github.com/projectcalico/typha/pkg/syncclient" + + "github.com/projectcalico/felix/buildinfo" + "github.com/projectcalico/felix/calc" + "github.com/projectcalico/felix/config" + _ "github.com/projectcalico/felix/config" + dp "github.com/projectcalico/felix/dataplane" + "github.com/projectcalico/felix/jitter" + "github.com/projectcalico/felix/logutils" + "github.com/projectcalico/felix/policysync" + "github.com/projectcalico/felix/proto" + "github.com/projectcalico/felix/statusrep" + "github.com/projectcalico/felix/usagerep" ) const ( @@ -1206,68 +1207,8 @@ func (fc *DataplaneConnector) Start() { go fc.handleWireguardStatUpdateFromDataplane() } -var ErrServiceNotReady = errors.New("Kubernetes service missing IP or port.") - func discoverTyphaAddr(configParams *config.Config, k8sClientSet kubernetes.Interface) (string, error) { - if configParams.TyphaAddr != "" { - // Explicit address; trumps other sources of config. - return configParams.TyphaAddr, nil - } - - if configParams.TyphaK8sServiceName == "" { - // No explicit address, and no service name, not using Typha. - return "", nil - } - - if k8sClientSet == nil { - return "", errors.New("failed to look up Typha, no Kubernetes client available") - } - - // If we get here, we need to look up the Typha service endpoints using the k8s API. - epClient := k8sClientSet.CoreV1().Endpoints(configParams.TyphaK8sNamespace) - eps, err := epClient.Get(context.Background(), configParams.TyphaK8sServiceName, metav1.GetOptions{}) - if err != nil { - log.WithError(err).Error("Unable to get Typha service endpoints from Kubernetes.") - return "", err - } - - candidates := set.New() - - for _, subset := range eps.Subsets { - var portForOurVersion int32 - for _, port := range subset.Ports { - if port.Name == "calico-typha" { - portForOurVersion = port.Port - break - } - } - - if portForOurVersion == 0 { - continue - } - - // If we get here, this endpoint supports the typha port we're looking for. - for _, h := range subset.Addresses { - typhaAddr := net.JoinHostPort(h.IP, fmt.Sprint(portForOurVersion)) - candidates.Add(typhaAddr) - } - } - - if candidates.Len() == 0 { - log.Error("Didn't find any ready Typha instances.") - return "", ErrServiceNotReady - } - - var addrs []string - candidates.Iter(func(item interface{}) error { - typhaAddr := item.(string) - addrs = append(addrs, typhaAddr) - return nil - }) - log.WithField("addrs", addrs).Info("Found ready Typha addresses.") - n := rand.Intn(len(addrs)) - chosenAddr := addrs[n] - log.WithField("choice", chosenAddr).Info("Chose Typha to connect to.") - - return chosenAddr, nil + typhaDiscoveryOpts := configParams.TyphaDiscoveryOpts() + typhaDiscoveryOpts = append(typhaDiscoveryOpts, discovery.WithKubeClient(k8sClientSet)) + return discovery.DiscoverTyphaAddr(typhaDiscoveryOpts...) }